6528fdd3e9190106287a44bab794e37bf311ea8f
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex = INIT_MUTEX_VAR;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif          
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418         if (shutting_down_scheduler) {
419             IF_DEBUG(scheduler, sched_belch("shutting down"));
420             // If we are a worker, just exit.  If we're a bound thread
421             // then we will exit below when we've removed our TSO from
422             // the run queue.
423             if (task->tso == NULL && emptyRunQueue(cap)) {
424                 return cap;
425             }
426         } else {
427             IF_DEBUG(scheduler, sched_belch("interrupted"));
428         }
429     }
430
431 #if defined(not_yet) && defined(SMP)
432     //
433     // Top up the run queue from our spark pool.  We try to make the
434     // number of threads in the run queue equal to the number of
435     // free capabilities.
436     //
437     {
438         StgClosure *spark;
439         if (emptyRunQueue()) {
440             spark = findSpark(rtsFalse);
441             if (spark == NULL) {
442                 break; /* no more sparks in the pool */
443             } else {
444                 createSparkThread(spark);         
445                 IF_DEBUG(scheduler,
446                          sched_belch("==^^ turning spark of closure %p into a thread",
447                                      (StgClosure *)spark));
448             }
449         }
450     }
451 #endif // SMP
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464
465     // Normally, the only way we can get here with no threads to
466     // run is if a keyboard interrupt received during 
467     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468     // Additionally, it is not fatal for the
469     // threaded RTS to reach here with no threads to run.
470     //
471     // win32: might be here due to awaitEvent() being abandoned
472     // as a result of a console event having been delivered.
473     if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475         ASSERT(interrupted);
476 #endif
477         continue; // nothing to do
478     }
479
480 #if defined(PARALLEL_HASKELL)
481     scheduleSendPendingMessages();
482     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483         continue;
484
485 #if defined(SPARKS)
486     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
487 #endif
488
489     /* If we still have no work we need to send a FISH to get a spark
490        from another PE */
491     if (emptyRunQueue(cap)) {
492         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493         ASSERT(rtsFalse); // should not happen at the moment
494     }
495     // from here: non-empty run queue.
496     //  TODO: merge above case with this, only one call processMessages() !
497     if (PacketsWaiting()) {  /* process incoming messages, if
498                                 any pending...  only in else
499                                 because getRemoteWork waits for
500                                 messages as well */
501         receivedFinish = processMessages();
502     }
503 #endif
504
505 #if defined(GRAN)
506     scheduleProcessEvent(event);
507 #endif
508
509     // 
510     // Get a thread to run
511     //
512     t = popRunQueue(cap);
513
514 #if defined(GRAN) || defined(PAR)
515     scheduleGranParReport(); // some kind of debuging output
516 #else
517     // Sanity check the thread we're about to run.  This can be
518     // expensive if there is lots of thread switching going on...
519     IF_DEBUG(sanity,checkTSO(t));
520 #endif
521
522 #if defined(THREADED_RTS)
523     // Check whether we can run this thread in the current task.
524     // If not, we have to pass our capability to the right task.
525     {
526         Task *bound = t->bound;
527       
528         if (bound) {
529             if (bound == task) {
530                 IF_DEBUG(scheduler,
531                          sched_belch("### Running thread %d in bound thread",
532                                      t->id));
533                 // yes, the Haskell thread is bound to the current native thread
534             } else {
535                 IF_DEBUG(scheduler,
536                          sched_belch("### thread %d bound to another OS thread",
537                                      t->id));
538                 // no, bound to a different Haskell thread: pass to that thread
539                 pushOnRunQueue(cap,t);
540                 continue;
541             }
542         } else {
543             // The thread we want to run is unbound.
544             if (task->tso) { 
545                 IF_DEBUG(scheduler,
546                          sched_belch("### this OS thread cannot run thread %d", t->id));
547                 // no, the current native thread is bound to a different
548                 // Haskell thread, so pass it to any worker thread
549                 pushOnRunQueue(cap,t);
550                 continue; 
551             }
552         }
553     }
554 #endif
555
556     cap->r.rCurrentTSO = t;
557     
558     /* context switches are initiated by the timer signal, unless
559      * the user specified "context switch as often as possible", with
560      * +RTS -C0
561      */
562     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563         && !emptyThreadQueues(cap)) {
564         context_switch = 1;
565     }
566          
567 run_thread:
568
569     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]));
571
572 #if defined(PROFILING)
573     startHeapProfTimer();
574 #endif
575
576     // ----------------------------------------------------------------------
577     // Run the current thread 
578
579     prev_what_next = t->what_next;
580
581     errno = t->saved_errno;
582     cap->in_haskell = rtsTrue;
583
584     recent_activity = ACTIVITY_YES;
585
586     switch (prev_what_next) {
587         
588     case ThreadKilled:
589     case ThreadComplete:
590         /* Thread already finished, return to scheduler. */
591         ret = ThreadFinished;
592         break;
593         
594     case ThreadRunGHC:
595     {
596         StgRegTable *r;
597         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598         cap = regTableToCapability(r);
599         ret = r->rRet;
600         break;
601     }
602     
603     case ThreadInterpret:
604         cap = interpretBCO(cap);
605         ret = cap->r.rRet;
606         break;
607         
608     default:
609         barf("schedule: invalid what_next field");
610     }
611
612     cap->in_haskell = rtsFalse;
613
614     // The TSO might have moved, eg. if it re-entered the RTS and a GC
615     // happened.  So find the new location:
616     t = cap->r.rCurrentTSO;
617
618 #ifdef SMP
619     // If ret is ThreadBlocked, and this Task is bound to the TSO that
620     // blocked, we are in limbo - the TSO is now owned by whatever it
621     // is blocked on, and may in fact already have been woken up,
622     // perhaps even on a different Capability.  It may be the case
623     // that task->cap != cap.  We better yield this Capability
624     // immediately and return to normaility.
625     if (ret == ThreadBlocked) {
626         IF_DEBUG(scheduler,
627                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
628                             t->id, whatNext_strs[t->what_next]));
629         continue;
630     }
631 #endif
632
633     ASSERT_CAPABILITY_INVARIANTS(cap,task);
634
635     // And save the current errno in this thread.
636     t->saved_errno = errno;
637
638     // ----------------------------------------------------------------------
639     
640     // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
642     stopHeapProfTimer();
643     CCCS = CCS_SYSTEM;
644 #endif
645     
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653 #if defined(THREADED_RTS)
654     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656     IF_DEBUG(scheduler,debugBelch("sched: "););
657 #endif
658     
659     schedulePostRunThread();
660
661     ready_to_gc = rtsFalse;
662
663     switch (ret) {
664     case HeapOverflow:
665         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
666         break;
667
668     case StackOverflow:
669         scheduleHandleStackOverflow(cap,task,t);
670         break;
671
672     case ThreadYielding:
673         if (scheduleHandleYield(cap, t, prev_what_next)) {
674             // shortcut for switching between compiler/interpreter:
675             goto run_thread; 
676         }
677         break;
678
679     case ThreadBlocked:
680         scheduleHandleThreadBlocked(t);
681         break;
682
683     case ThreadFinished:
684         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685         ASSERT_CAPABILITY_INVARIANTS(cap,task);
686         break;
687
688     default:
689       barf("schedule: invalid thread return code %d", (int)ret);
690     }
691
692     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694   } /* end of while() */
695
696   IF_PAR_DEBUG(verbose,
697                debugBelch("== Leaving schedule() after having received Finish\n"));
698 }
699
700 /* ----------------------------------------------------------------------------
701  * Setting up the scheduler loop
702  * ------------------------------------------------------------------------- */
703
704 static void
705 schedulePreLoop(void)
706 {
707 #if defined(GRAN) 
708     /* set up first event to get things going */
709     /* ToDo: assign costs for system setup and init MainTSO ! */
710     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711               ContinueThread, 
712               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
713     
714     IF_DEBUG(gran,
715              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
716                         CurrentTSO);
717              G_TSO(CurrentTSO, 5));
718     
719     if (RtsFlags.GranFlags.Light) {
720         /* Save current time; GranSim Light only */
721         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
722     }      
723 #endif
724 }
725
726 /* -----------------------------------------------------------------------------
727  * schedulePushWork()
728  *
729  * Push work to other Capabilities if we have some.
730  * -------------------------------------------------------------------------- */
731
732 #ifdef SMP
733 static void
734 schedulePushWork(Capability *cap USED_WHEN_SMP, 
735                  Task *task      USED_WHEN_SMP)
736 {
737     Capability *free_caps[n_capabilities], *cap0;
738     nat i, n_free_caps;
739
740     // Check whether we have more threads on our run queue that we
741     // could hand to another Capability.
742     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
774
775         prev = cap->run_queue_hd;
776         t = prev->link;
777         prev->link = END_TSO_QUEUE;
778         i = 0;
779         for (; t != END_TSO_QUEUE; t = next) {
780             next = t->link;
781             t->link = END_TSO_QUEUE;
782             if (t->what_next == ThreadRelocated) {
783                 prev->link = t;
784                 prev = t;
785             } else if (i == n_free_caps) {
786                 i = 0;
787                 // keep one for us
788                 prev->link = t;
789                 prev = t;
790             } else {
791                 appendToRunQueue(free_caps[i],t);
792                 if (t->bound) { t->bound->cap = free_caps[i]; }
793                 i++;
794             }
795         }
796         cap->run_queue_tl = prev;
797
798         // release the capabilities
799         for (i = 0; i < n_free_caps; i++) {
800             task->cap = free_caps[i];
801             releaseCapability(free_caps[i]);
802         }
803     }
804     task->cap = cap; // reset to point to our Capability.
805 }
806 #endif
807
808 /* ----------------------------------------------------------------------------
809  * Start any pending signal handlers
810  * ------------------------------------------------------------------------- */
811
812 static void
813 scheduleStartSignalHandlers(Capability *cap)
814 {
815 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
816     if (signals_pending()) { // safe outside the lock
817         startSignalHandlers(cap);
818     }
819 #endif
820 }
821
822 /* ----------------------------------------------------------------------------
823  * Check for blocked threads that can be woken up.
824  * ------------------------------------------------------------------------- */
825
826 static void
827 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
828 {
829 #if !defined(THREADED_RTS)
830     //
831     // Check whether any waiting threads need to be woken up.  If the
832     // run queue is empty, and there are no other tasks running, we
833     // can wait indefinitely for something to happen.
834     //
835     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
836     {
837         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
838     }
839 #endif
840 }
841
842
843 /* ----------------------------------------------------------------------------
844  * Check for threads blocked on BLACKHOLEs that can be woken up
845  * ------------------------------------------------------------------------- */
846 static void
847 scheduleCheckBlackHoles (Capability *cap)
848 {
849     if ( blackholes_need_checking ) // check without the lock first
850     {
851         ACQUIRE_LOCK(&sched_mutex);
852         if ( blackholes_need_checking ) {
853             checkBlackHoles(cap);
854             blackholes_need_checking = rtsFalse;
855         }
856         RELEASE_LOCK(&sched_mutex);
857     }
858 }
859
860 /* ----------------------------------------------------------------------------
861  * Detect deadlock conditions and attempt to resolve them.
862  * ------------------------------------------------------------------------- */
863
864 static void
865 scheduleDetectDeadlock (Capability *cap, Task *task)
866 {
867
868 #if defined(PARALLEL_HASKELL)
869     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
870     return;
871 #endif
872
873     /* 
874      * Detect deadlock: when we have no threads to run, there are no
875      * threads blocked, waiting for I/O, or sleeping, and all the
876      * other tasks are waiting for work, we must have a deadlock of
877      * some description.
878      */
879     if ( emptyThreadQueues(cap) )
880     {
881 #if defined(THREADED_RTS)
882         /* 
883          * In the threaded RTS, we only check for deadlock if there
884          * has been no activity in a complete timeslice.  This means
885          * we won't eagerly start a full GC just because we don't have
886          * any threads to run currently.
887          */
888         if (recent_activity != ACTIVITY_INACTIVE) return;
889 #endif
890
891         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
892
893         // Garbage collection can release some new threads due to
894         // either (a) finalizers or (b) threads resurrected because
895         // they are unreachable and will therefore be sent an
896         // exception.  Any threads thus released will be immediately
897         // runnable.
898         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
899         recent_activity = ACTIVITY_DONE_GC;
900         
901         if ( !emptyRunQueue(cap) ) return;
902
903 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
904         /* If we have user-installed signal handlers, then wait
905          * for signals to arrive rather then bombing out with a
906          * deadlock.
907          */
908         if ( anyUserHandlers() ) {
909             IF_DEBUG(scheduler, 
910                      sched_belch("still deadlocked, waiting for signals..."));
911
912             awaitUserSignals();
913
914             if (signals_pending()) {
915                 startSignalHandlers(cap);
916             }
917
918             // either we have threads to run, or we were interrupted:
919             ASSERT(!emptyRunQueue(cap) || interrupted);
920         }
921 #endif
922
923 #if !defined(THREADED_RTS)
924         /* Probably a real deadlock.  Send the current main thread the
925          * Deadlock exception.
926          */
927         if (task->tso) {
928             switch (task->tso->why_blocked) {
929             case BlockedOnSTM:
930             case BlockedOnBlackHole:
931             case BlockedOnException:
932             case BlockedOnMVar:
933                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
934                 return;
935             default:
936                 barf("deadlock: main thread blocked in a strange way");
937             }
938         }
939         return;
940 #endif
941     }
942 }
943
944 /* ----------------------------------------------------------------------------
945  * Process an event (GRAN only)
946  * ------------------------------------------------------------------------- */
947
948 #if defined(GRAN)
949 static StgTSO *
950 scheduleProcessEvent(rtsEvent *event)
951 {
952     StgTSO *t;
953
954     if (RtsFlags.GranFlags.Light)
955       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
956
957     /* adjust time based on time-stamp */
958     if (event->time > CurrentTime[CurrentProc] &&
959         event->evttype != ContinueThread)
960       CurrentTime[CurrentProc] = event->time;
961     
962     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
963     if (!RtsFlags.GranFlags.Light)
964       handleIdlePEs();
965
966     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
967
968     /* main event dispatcher in GranSim */
969     switch (event->evttype) {
970       /* Should just be continuing execution */
971     case ContinueThread:
972       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
973       /* ToDo: check assertion
974       ASSERT(run_queue_hd != (StgTSO*)NULL &&
975              run_queue_hd != END_TSO_QUEUE);
976       */
977       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
978       if (!RtsFlags.GranFlags.DoAsyncFetch &&
979           procStatus[CurrentProc]==Fetching) {
980         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
981               CurrentTSO->id, CurrentTSO, CurrentProc);
982         goto next_thread;
983       } 
984       /* Ignore ContinueThreads for completed threads */
985       if (CurrentTSO->what_next == ThreadComplete) {
986         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
987               CurrentTSO->id, CurrentTSO, CurrentProc);
988         goto next_thread;
989       } 
990       /* Ignore ContinueThreads for threads that are being migrated */
991       if (PROCS(CurrentTSO)==Nowhere) { 
992         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
993               CurrentTSO->id, CurrentTSO, CurrentProc);
994         goto next_thread;
995       }
996       /* The thread should be at the beginning of the run queue */
997       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
998         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
999               CurrentTSO->id, CurrentTSO, CurrentProc);
1000         break; // run the thread anyway
1001       }
1002       /*
1003       new_event(proc, proc, CurrentTime[proc],
1004                 FindWork,
1005                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1006       goto next_thread; 
1007       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1008       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1009
1010     case FetchNode:
1011       do_the_fetchnode(event);
1012       goto next_thread;             /* handle next event in event queue  */
1013       
1014     case GlobalBlock:
1015       do_the_globalblock(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case FetchReply:
1019       do_the_fetchreply(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case UnblockThread:   /* Move from the blocked queue to the tail of */
1023       do_the_unblock(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     case ResumeThread:  /* Move from the blocked queue to the tail of */
1027       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1028       event->tso->gran.blocktime += 
1029         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1030       do_the_startthread(event);
1031       goto next_thread;             /* handle next event in event queue  */
1032       
1033     case StartThread:
1034       do_the_startthread(event);
1035       goto next_thread;             /* handle next event in event queue  */
1036       
1037     case MoveThread:
1038       do_the_movethread(event);
1039       goto next_thread;             /* handle next event in event queue  */
1040       
1041     case MoveSpark:
1042       do_the_movespark(event);
1043       goto next_thread;             /* handle next event in event queue  */
1044       
1045     case FindWork:
1046       do_the_findwork(event);
1047       goto next_thread;             /* handle next event in event queue  */
1048       
1049     default:
1050       barf("Illegal event type %u\n", event->evttype);
1051     }  /* switch */
1052     
1053     /* This point was scheduler_loop in the old RTS */
1054
1055     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1056
1057     TimeOfLastEvent = CurrentTime[CurrentProc];
1058     TimeOfNextEvent = get_time_of_next_event();
1059     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1060     // CurrentTSO = ThreadQueueHd;
1061
1062     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1063                          TimeOfNextEvent));
1064
1065     if (RtsFlags.GranFlags.Light) 
1066       GranSimLight_leave_system(event, &ActiveTSO); 
1067
1068     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1069
1070     IF_DEBUG(gran, 
1071              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1072
1073     /* in a GranSim setup the TSO stays on the run queue */
1074     t = CurrentTSO;
1075     /* Take a thread from the run queue. */
1076     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1077
1078     IF_DEBUG(gran, 
1079              debugBelch("GRAN: About to run current thread, which is\n");
1080              G_TSO(t,5));
1081
1082     context_switch = 0; // turned on via GranYield, checking events and time slice
1083
1084     IF_DEBUG(gran, 
1085              DumpGranEvent(GR_SCHEDULE, t));
1086
1087     procStatus[CurrentProc] = Busy;
1088 }
1089 #endif // GRAN
1090
1091 /* ----------------------------------------------------------------------------
1092  * Send pending messages (PARALLEL_HASKELL only)
1093  * ------------------------------------------------------------------------- */
1094
1095 #if defined(PARALLEL_HASKELL)
1096 static StgTSO *
1097 scheduleSendPendingMessages(void)
1098 {
1099     StgSparkPool *pool;
1100     rtsSpark spark;
1101     StgTSO *t;
1102
1103 # if defined(PAR) // global Mem.Mgmt., omit for now
1104     if (PendingFetches != END_BF_QUEUE) {
1105         processFetches();
1106     }
1107 # endif
1108     
1109     if (RtsFlags.ParFlags.BufferTime) {
1110         // if we use message buffering, we must send away all message
1111         // packets which have become too old...
1112         sendOldBuffers(); 
1113     }
1114 }
1115 #endif
1116
1117 /* ----------------------------------------------------------------------------
1118  * Activate spark threads (PARALLEL_HASKELL only)
1119  * ------------------------------------------------------------------------- */
1120
1121 #if defined(PARALLEL_HASKELL)
1122 static void
1123 scheduleActivateSpark(void)
1124 {
1125 #if defined(SPARKS)
1126   ASSERT(emptyRunQueue());
1127 /* We get here if the run queue is empty and want some work.
1128    We try to turn a spark into a thread, and add it to the run queue,
1129    from where it will be picked up in the next iteration of the scheduler
1130    loop.
1131 */
1132
1133       /* :-[  no local threads => look out for local sparks */
1134       /* the spark pool for the current PE */
1135       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1136       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1137           pool->hd < pool->tl) {
1138         /* 
1139          * ToDo: add GC code check that we really have enough heap afterwards!!
1140          * Old comment:
1141          * If we're here (no runnable threads) and we have pending
1142          * sparks, we must have a space problem.  Get enough space
1143          * to turn one of those pending sparks into a
1144          * thread... 
1145          */
1146
1147         spark = findSpark(rtsFalse);            /* get a spark */
1148         if (spark != (rtsSpark) NULL) {
1149           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1150           IF_PAR_DEBUG(fish, // schedule,
1151                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1152                              tso->id, tso, advisory_thread_count));
1153
1154           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1155             IF_PAR_DEBUG(fish, // schedule,
1156                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1157                             spark));
1158             return rtsFalse; /* failed to generate a thread */
1159           }                  /* otherwise fall through & pick-up new tso */
1160         } else {
1161           IF_PAR_DEBUG(fish, // schedule,
1162                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1163                              spark_queue_len(pool)));
1164           return rtsFalse;  /* failed to generate a thread */
1165         }
1166         return rtsTrue;  /* success in generating a thread */
1167   } else { /* no more threads permitted or pool empty */
1168     return rtsFalse;  /* failed to generateThread */
1169   }
1170 #else
1171   tso = NULL; // avoid compiler warning only
1172   return rtsFalse;  /* dummy in non-PAR setup */
1173 #endif // SPARKS
1174 }
1175 #endif // PARALLEL_HASKELL
1176
1177 /* ----------------------------------------------------------------------------
1178  * Get work from a remote node (PARALLEL_HASKELL only)
1179  * ------------------------------------------------------------------------- */
1180     
1181 #if defined(PARALLEL_HASKELL)
1182 static rtsBool
1183 scheduleGetRemoteWork(rtsBool *receivedFinish)
1184 {
1185   ASSERT(emptyRunQueue());
1186
1187   if (RtsFlags.ParFlags.BufferTime) {
1188         IF_PAR_DEBUG(verbose, 
1189                 debugBelch("...send all pending data,"));
1190         {
1191           nat i;
1192           for (i=1; i<=nPEs; i++)
1193             sendImmediately(i); // send all messages away immediately
1194         }
1195   }
1196 # ifndef SPARKS
1197         //++EDEN++ idle() , i.e. send all buffers, wait for work
1198         // suppress fishing in EDEN... just look for incoming messages
1199         // (blocking receive)
1200   IF_PAR_DEBUG(verbose, 
1201                debugBelch("...wait for incoming messages...\n"));
1202   *receivedFinish = processMessages(); // blocking receive...
1203
1204         // and reenter scheduling loop after having received something
1205         // (return rtsFalse below)
1206
1207 # else /* activate SPARKS machinery */
1208 /* We get here, if we have no work, tried to activate a local spark, but still
1209    have no work. We try to get a remote spark, by sending a FISH message.
1210    Thread migration should be added here, and triggered when a sequence of 
1211    fishes returns without work. */
1212         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1213
1214       /* =8-[  no local sparks => look for work on other PEs */
1215         /*
1216          * We really have absolutely no work.  Send out a fish
1217          * (there may be some out there already), and wait for
1218          * something to arrive.  We clearly can't run any threads
1219          * until a SCHEDULE or RESUME arrives, and so that's what
1220          * we're hoping to see.  (Of course, we still have to
1221          * respond to other types of messages.)
1222          */
1223         rtsTime now = msTime() /*CURRENT_TIME*/;
1224         IF_PAR_DEBUG(verbose, 
1225                      debugBelch("--  now=%ld\n", now));
1226         IF_PAR_DEBUG(fish, // verbose,
1227              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1228                  (last_fish_arrived_at!=0 &&
1229                   last_fish_arrived_at+delay > now)) {
1230                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1231                      now, last_fish_arrived_at+delay, 
1232                      last_fish_arrived_at,
1233                      delay);
1234              });
1235   
1236         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1237             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1238           if (last_fish_arrived_at==0 ||
1239               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1240             /* outstandingFishes is set in sendFish, processFish;
1241                avoid flooding system with fishes via delay */
1242     next_fish_to_send_at = 0;  
1243   } else {
1244     /* ToDo: this should be done in the main scheduling loop to avoid the
1245              busy wait here; not so bad if fish delay is very small  */
1246     int iq = 0; // DEBUGGING -- HWL
1247     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1248     /* send a fish when ready, but process messages that arrive in the meantime */
1249     do {
1250       if (PacketsWaiting()) {
1251         iq++; // DEBUGGING
1252         *receivedFinish = processMessages();
1253       }
1254       now = msTime();
1255     } while (!*receivedFinish || now<next_fish_to_send_at);
1256     // JB: This means the fish could become obsolete, if we receive
1257     // work. Better check for work again? 
1258     // last line: while (!receivedFinish || !haveWork || now<...)
1259     // next line: if (receivedFinish || haveWork )
1260
1261     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1262       return rtsFalse;  // NB: this will leave scheduler loop
1263                         // immediately after return!
1264                           
1265     IF_PAR_DEBUG(fish, // verbose,
1266                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1267
1268   }
1269
1270     // JB: IMHO, this should all be hidden inside sendFish(...)
1271     /* pe = choosePE(); 
1272        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1273                 NEW_FISH_HUNGER);
1274
1275     // Global statistics: count no. of fishes
1276     if (RtsFlags.ParFlags.ParStats.Global &&
1277          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1278            globalParStats.tot_fish_mess++;
1279            }
1280     */ 
1281
1282   /* delayed fishes must have been sent by now! */
1283   next_fish_to_send_at = 0;  
1284   }
1285       
1286   *receivedFinish = processMessages();
1287 # endif /* SPARKS */
1288
1289  return rtsFalse;
1290  /* NB: this function always returns rtsFalse, meaning the scheduler
1291     loop continues with the next iteration; 
1292     rationale: 
1293       return code means success in finding work; we enter this function
1294       if there is no local work, thus have to send a fish which takes
1295       time until it arrives with work; in the meantime we should process
1296       messages in the main loop;
1297  */
1298 }
1299 #endif // PARALLEL_HASKELL
1300
1301 /* ----------------------------------------------------------------------------
1302  * PAR/GRAN: Report stats & debugging info(?)
1303  * ------------------------------------------------------------------------- */
1304
1305 #if defined(PAR) || defined(GRAN)
1306 static void
1307 scheduleGranParReport(void)
1308 {
1309   ASSERT(run_queue_hd != END_TSO_QUEUE);
1310
1311   /* Take a thread from the run queue, if we have work */
1312   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1313
1314     /* If this TSO has got its outport closed in the meantime, 
1315      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1316      * It has to be marked as TH_DEAD for this purpose.
1317      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1318
1319 JB: TODO: investigate wether state change field could be nuked
1320      entirely and replaced by the normal tso state (whatnext
1321      field). All we want to do is to kill tsos from outside.
1322      */
1323
1324     /* ToDo: write something to the log-file
1325     if (RTSflags.ParFlags.granSimStats && !sameThread)
1326         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1327
1328     CurrentTSO = t;
1329     */
1330     /* the spark pool for the current PE */
1331     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1332
1333     IF_DEBUG(scheduler, 
1334              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1335                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1336
1337     IF_PAR_DEBUG(fish,
1338              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1339                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1340
1341     if (RtsFlags.ParFlags.ParStats.Full && 
1342         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1343         (emitSchedule || // forced emit
1344          (t && LastTSO && t->id != LastTSO->id))) {
1345       /* 
1346          we are running a different TSO, so write a schedule event to log file
1347          NB: If we use fair scheduling we also have to write  a deschedule 
1348              event for LastTSO; with unfair scheduling we know that the
1349              previous tso has blocked whenever we switch to another tso, so
1350              we don't need it in GUM for now
1351       */
1352       IF_PAR_DEBUG(fish, // schedule,
1353                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1354
1355       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1356                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1357       emitSchedule = rtsFalse;
1358     }
1359 }     
1360 #endif
1361
1362 /* ----------------------------------------------------------------------------
1363  * After running a thread...
1364  * ------------------------------------------------------------------------- */
1365
1366 static void
1367 schedulePostRunThread(void)
1368 {
1369 #if defined(PAR)
1370     /* HACK 675: if the last thread didn't yield, make sure to print a 
1371        SCHEDULE event to the log file when StgRunning the next thread, even
1372        if it is the same one as before */
1373     LastTSO = t; 
1374     TimeOfLastYield = CURRENT_TIME;
1375 #endif
1376
1377   /* some statistics gathering in the parallel case */
1378
1379 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1380   switch (ret) {
1381     case HeapOverflow:
1382 # if defined(GRAN)
1383       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1384       globalGranStats.tot_heapover++;
1385 # elif defined(PAR)
1386       globalParStats.tot_heapover++;
1387 # endif
1388       break;
1389
1390      case StackOverflow:
1391 # if defined(GRAN)
1392       IF_DEBUG(gran, 
1393                DumpGranEvent(GR_DESCHEDULE, t));
1394       globalGranStats.tot_stackover++;
1395 # elif defined(PAR)
1396       // IF_DEBUG(par, 
1397       // DumpGranEvent(GR_DESCHEDULE, t);
1398       globalParStats.tot_stackover++;
1399 # endif
1400       break;
1401
1402     case ThreadYielding:
1403 # if defined(GRAN)
1404       IF_DEBUG(gran, 
1405                DumpGranEvent(GR_DESCHEDULE, t));
1406       globalGranStats.tot_yields++;
1407 # elif defined(PAR)
1408       // IF_DEBUG(par, 
1409       // DumpGranEvent(GR_DESCHEDULE, t);
1410       globalParStats.tot_yields++;
1411 # endif
1412       break; 
1413
1414     case ThreadBlocked:
1415 # if defined(GRAN)
1416       IF_DEBUG(scheduler,
1417                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1418                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1419                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1420                if (t->block_info.closure!=(StgClosure*)NULL)
1421                  print_bq(t->block_info.closure);
1422                debugBelch("\n"));
1423
1424       // ??? needed; should emit block before
1425       IF_DEBUG(gran, 
1426                DumpGranEvent(GR_DESCHEDULE, t)); 
1427       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1428       /*
1429         ngoq Dogh!
1430       ASSERT(procStatus[CurrentProc]==Busy || 
1431               ((procStatus[CurrentProc]==Fetching) && 
1432               (t->block_info.closure!=(StgClosure*)NULL)));
1433       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1434           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1435             procStatus[CurrentProc]==Fetching)) 
1436         procStatus[CurrentProc] = Idle;
1437       */
1438 # elif defined(PAR)
1439 //++PAR++  blockThread() writes the event (change?)
1440 # endif
1441     break;
1442
1443   case ThreadFinished:
1444     break;
1445
1446   default:
1447     barf("parGlobalStats: unknown return code");
1448     break;
1449     }
1450 #endif
1451 }
1452
1453 /* -----------------------------------------------------------------------------
1454  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1455  * -------------------------------------------------------------------------- */
1456
1457 static rtsBool
1458 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1459 {
1460     // did the task ask for a large block?
1461     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1462         // if so, get one and push it on the front of the nursery.
1463         bdescr *bd;
1464         lnat blocks;
1465         
1466         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1467         
1468         IF_DEBUG(scheduler,
1469                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1470                             (long)t->id, whatNext_strs[t->what_next], blocks));
1471         
1472         // don't do this if the nursery is (nearly) full, we'll GC first.
1473         if (cap->r.rCurrentNursery->link != NULL ||
1474             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1475                                                // if the nursery has only one block.
1476             
1477             ACQUIRE_SM_LOCK
1478             bd = allocGroup( blocks );
1479             RELEASE_SM_LOCK
1480             cap->r.rNursery->n_blocks += blocks;
1481             
1482             // link the new group into the list
1483             bd->link = cap->r.rCurrentNursery;
1484             bd->u.back = cap->r.rCurrentNursery->u.back;
1485             if (cap->r.rCurrentNursery->u.back != NULL) {
1486                 cap->r.rCurrentNursery->u.back->link = bd;
1487             } else {
1488 #if !defined(SMP)
1489                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1490                        g0s0 == cap->r.rNursery);
1491 #endif
1492                 cap->r.rNursery->blocks = bd;
1493             }             
1494             cap->r.rCurrentNursery->u.back = bd;
1495             
1496             // initialise it as a nursery block.  We initialise the
1497             // step, gen_no, and flags field of *every* sub-block in
1498             // this large block, because this is easier than making
1499             // sure that we always find the block head of a large
1500             // block whenever we call Bdescr() (eg. evacuate() and
1501             // isAlive() in the GC would both have to do this, at
1502             // least).
1503             { 
1504                 bdescr *x;
1505                 for (x = bd; x < bd + blocks; x++) {
1506                     x->step = cap->r.rNursery;
1507                     x->gen_no = 0;
1508                     x->flags = 0;
1509                 }
1510             }
1511             
1512             // This assert can be a killer if the app is doing lots
1513             // of large block allocations.
1514             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1515             
1516             // now update the nursery to point to the new block
1517             cap->r.rCurrentNursery = bd;
1518             
1519             // we might be unlucky and have another thread get on the
1520             // run queue before us and steal the large block, but in that
1521             // case the thread will just end up requesting another large
1522             // block.
1523             pushOnRunQueue(cap,t);
1524             return rtsFalse;  /* not actually GC'ing */
1525         }
1526     }
1527     
1528     IF_DEBUG(scheduler,
1529              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1530                         (long)t->id, whatNext_strs[t->what_next]));
1531 #if defined(GRAN)
1532     ASSERT(!is_on_queue(t,CurrentProc));
1533 #elif defined(PARALLEL_HASKELL)
1534     /* Currently we emit a DESCHEDULE event before GC in GUM.
1535        ToDo: either add separate event to distinguish SYSTEM time from rest
1536        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1537     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1538         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1539                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1540         emitSchedule = rtsTrue;
1541     }
1542 #endif
1543       
1544     pushOnRunQueue(cap,t);
1545     return rtsTrue;
1546     /* actual GC is done at the end of the while loop in schedule() */
1547 }
1548
1549 /* -----------------------------------------------------------------------------
1550  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1551  * -------------------------------------------------------------------------- */
1552
1553 static void
1554 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1555 {
1556     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1557                                   (long)t->id, whatNext_strs[t->what_next]));
1558     /* just adjust the stack for this thread, then pop it back
1559      * on the run queue.
1560      */
1561     { 
1562         /* enlarge the stack */
1563         StgTSO *new_t = threadStackOverflow(cap, t);
1564         
1565         /* The TSO attached to this Task may have moved, so update the
1566          * pointer to it.
1567          */
1568         if (task->tso == t) {
1569             task->tso = new_t;
1570         }
1571         pushOnRunQueue(cap,new_t);
1572     }
1573 }
1574
1575 /* -----------------------------------------------------------------------------
1576  * Handle a thread that returned to the scheduler with ThreadYielding
1577  * -------------------------------------------------------------------------- */
1578
1579 static rtsBool
1580 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1581 {
1582     // Reset the context switch flag.  We don't do this just before
1583     // running the thread, because that would mean we would lose ticks
1584     // during GC, which can lead to unfair scheduling (a thread hogs
1585     // the CPU because the tick always arrives during GC).  This way
1586     // penalises threads that do a lot of allocation, but that seems
1587     // better than the alternative.
1588     context_switch = 0;
1589     
1590     /* put the thread back on the run queue.  Then, if we're ready to
1591      * GC, check whether this is the last task to stop.  If so, wake
1592      * up the GC thread.  getThread will block during a GC until the
1593      * GC is finished.
1594      */
1595     IF_DEBUG(scheduler,
1596              if (t->what_next != prev_what_next) {
1597                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1598                             (long)t->id, whatNext_strs[t->what_next]);
1599              } else {
1600                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1601                             (long)t->id, whatNext_strs[t->what_next]);
1602              }
1603         );
1604     
1605     IF_DEBUG(sanity,
1606              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1607              checkTSO(t));
1608     ASSERT(t->link == END_TSO_QUEUE);
1609     
1610     // Shortcut if we're just switching evaluators: don't bother
1611     // doing stack squeezing (which can be expensive), just run the
1612     // thread.
1613     if (t->what_next != prev_what_next) {
1614         return rtsTrue;
1615     }
1616     
1617 #if defined(GRAN)
1618     ASSERT(!is_on_queue(t,CurrentProc));
1619       
1620     IF_DEBUG(sanity,
1621              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1622              checkThreadQsSanity(rtsTrue));
1623
1624 #endif
1625
1626     addToRunQueue(cap,t);
1627
1628 #if defined(GRAN)
1629     /* add a ContinueThread event to actually process the thread */
1630     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1631               ContinueThread,
1632               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1633     IF_GRAN_DEBUG(bq, 
1634                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1635                   G_EVENTQ(0);
1636                   G_CURR_THREADQ(0));
1637 #endif
1638     return rtsFalse;
1639 }
1640
1641 /* -----------------------------------------------------------------------------
1642  * Handle a thread that returned to the scheduler with ThreadBlocked
1643  * -------------------------------------------------------------------------- */
1644
1645 static void
1646 scheduleHandleThreadBlocked( StgTSO *t
1647 #if !defined(GRAN) && !defined(DEBUG)
1648     STG_UNUSED
1649 #endif
1650     )
1651 {
1652 #if defined(GRAN)
1653     IF_DEBUG(scheduler,
1654              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1655                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1656              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1657     
1658     // ??? needed; should emit block before
1659     IF_DEBUG(gran, 
1660              DumpGranEvent(GR_DESCHEDULE, t)); 
1661     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1662     /*
1663       ngoq Dogh!
1664       ASSERT(procStatus[CurrentProc]==Busy || 
1665       ((procStatus[CurrentProc]==Fetching) && 
1666       (t->block_info.closure!=(StgClosure*)NULL)));
1667       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1668       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1669       procStatus[CurrentProc]==Fetching)) 
1670       procStatus[CurrentProc] = Idle;
1671     */
1672 #elif defined(PAR)
1673     IF_DEBUG(scheduler,
1674              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1675                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1676     IF_PAR_DEBUG(bq,
1677                  
1678                  if (t->block_info.closure!=(StgClosure*)NULL) 
1679                  print_bq(t->block_info.closure));
1680     
1681     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1682     blockThread(t);
1683     
1684     /* whatever we schedule next, we must log that schedule */
1685     emitSchedule = rtsTrue;
1686     
1687 #else /* !GRAN */
1688
1689       // We don't need to do anything.  The thread is blocked, and it
1690       // has tidied up its stack and placed itself on whatever queue
1691       // it needs to be on.
1692
1693 #if !defined(SMP)
1694     ASSERT(t->why_blocked != NotBlocked);
1695              // This might not be true under SMP: we don't have
1696              // exclusive access to this TSO, so someone might have
1697              // woken it up by now.  This actually happens: try
1698              // conc023 +RTS -N2.
1699 #endif
1700
1701     IF_DEBUG(scheduler,
1702              debugBelch("--<< thread %d (%s) stopped: ", 
1703                         t->id, whatNext_strs[t->what_next]);
1704              printThreadBlockage(t);
1705              debugBelch("\n"));
1706     
1707     /* Only for dumping event to log file 
1708        ToDo: do I need this in GranSim, too?
1709        blockThread(t);
1710     */
1711 #endif
1712 }
1713
1714 /* -----------------------------------------------------------------------------
1715  * Handle a thread that returned to the scheduler with ThreadFinished
1716  * -------------------------------------------------------------------------- */
1717
1718 static rtsBool
1719 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1720 {
1721     /* Need to check whether this was a main thread, and if so,
1722      * return with the return value.
1723      *
1724      * We also end up here if the thread kills itself with an
1725      * uncaught exception, see Exception.cmm.
1726      */
1727     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1728                                   t->id, whatNext_strs[t->what_next]));
1729
1730 #if defined(GRAN)
1731       endThread(t, CurrentProc); // clean-up the thread
1732 #elif defined(PARALLEL_HASKELL)
1733       /* For now all are advisory -- HWL */
1734       //if(t->priority==AdvisoryPriority) ??
1735       advisory_thread_count--; // JB: Caution with this counter, buggy!
1736       
1737 # if defined(DIST)
1738       if(t->dist.priority==RevalPriority)
1739         FinishReval(t);
1740 # endif
1741     
1742 # if defined(EDENOLD)
1743       // the thread could still have an outport... (BUG)
1744       if (t->eden.outport != -1) {
1745       // delete the outport for the tso which has finished...
1746         IF_PAR_DEBUG(eden_ports,
1747                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1748                               t->eden.outport, t->id));
1749         deleteOPT(t);
1750       }
1751       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1752       if (t->eden.epid != -1) {
1753         IF_PAR_DEBUG(eden_ports,
1754                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1755                            t->id, t->eden.epid));
1756         removeTSOfromProcess(t);
1757       }
1758 # endif 
1759
1760 # if defined(PAR)
1761       if (RtsFlags.ParFlags.ParStats.Full &&
1762           !RtsFlags.ParFlags.ParStats.Suppressed) 
1763         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1764
1765       //  t->par only contains statistics: left out for now...
1766       IF_PAR_DEBUG(fish,
1767                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1768                               t->id,t,t->par.sparkname));
1769 # endif
1770 #endif // PARALLEL_HASKELL
1771
1772       //
1773       // Check whether the thread that just completed was a bound
1774       // thread, and if so return with the result.  
1775       //
1776       // There is an assumption here that all thread completion goes
1777       // through this point; we need to make sure that if a thread
1778       // ends up in the ThreadKilled state, that it stays on the run
1779       // queue so it can be dealt with here.
1780       //
1781
1782       if (t->bound) {
1783
1784           if (t->bound != task) {
1785 #if !defined(THREADED_RTS)
1786               // Must be a bound thread that is not the topmost one.  Leave
1787               // it on the run queue until the stack has unwound to the
1788               // point where we can deal with this.  Leaving it on the run
1789               // queue also ensures that the garbage collector knows about
1790               // this thread and its return value (it gets dropped from the
1791               // all_threads list so there's no other way to find it).
1792               appendToRunQueue(cap,t);
1793               return rtsFalse;
1794 #else
1795               // this cannot happen in the threaded RTS, because a
1796               // bound thread can only be run by the appropriate Task.
1797               barf("finished bound thread that isn't mine");
1798 #endif
1799           }
1800
1801           ASSERT(task->tso == t);
1802
1803           if (t->what_next == ThreadComplete) {
1804               if (task->ret) {
1805                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1806                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1807               }
1808               task->stat = Success;
1809           } else {
1810               if (task->ret) {
1811                   *(task->ret) = NULL;
1812               }
1813               if (interrupted) {
1814                   task->stat = Interrupted;
1815               } else {
1816                   task->stat = Killed;
1817               }
1818           }
1819 #ifdef DEBUG
1820           removeThreadLabel((StgWord)task->tso->id);
1821 #endif
1822           return rtsTrue; // tells schedule() to return
1823       }
1824
1825       return rtsFalse;
1826 }
1827
1828 /* -----------------------------------------------------------------------------
1829  * Perform a heap census, if PROFILING
1830  * -------------------------------------------------------------------------- */
1831
1832 static rtsBool
1833 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1834 {
1835 #if defined(PROFILING)
1836     // When we have +RTS -i0 and we're heap profiling, do a census at
1837     // every GC.  This lets us get repeatable runs for debugging.
1838     if (performHeapProfile ||
1839         (RtsFlags.ProfFlags.profileInterval==0 &&
1840          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1841         GarbageCollect(GetRoots, rtsTrue);
1842         heapCensus();
1843         performHeapProfile = rtsFalse;
1844         return rtsTrue;  // true <=> we already GC'd
1845     }
1846 #endif
1847     return rtsFalse;
1848 }
1849
1850 /* -----------------------------------------------------------------------------
1851  * Perform a garbage collection if necessary
1852  * -------------------------------------------------------------------------- */
1853
1854 static void
1855 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1856 {
1857     StgTSO *t;
1858 #ifdef SMP
1859     static volatile StgWord waiting_for_gc;
1860     rtsBool was_waiting;
1861     nat i;
1862 #endif
1863
1864 #ifdef SMP
1865     // In order to GC, there must be no threads running Haskell code.
1866     // Therefore, the GC thread needs to hold *all* the capabilities,
1867     // and release them after the GC has completed.  
1868     //
1869     // This seems to be the simplest way: previous attempts involved
1870     // making all the threads with capabilities give up their
1871     // capabilities and sleep except for the *last* one, which
1872     // actually did the GC.  But it's quite hard to arrange for all
1873     // the other tasks to sleep and stay asleep.
1874     //
1875         
1876     was_waiting = cas(&waiting_for_gc, 0, 1);
1877     if (was_waiting) {
1878         do {
1879             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1880             yieldCapability(&cap,task);
1881         } while (waiting_for_gc);
1882         return;
1883     }
1884
1885     for (i=0; i < n_capabilities; i++) {
1886         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1887         if (cap != &capabilities[i]) {
1888             Capability *pcap = &capabilities[i];
1889             // we better hope this task doesn't get migrated to
1890             // another Capability while we're waiting for this one.
1891             // It won't, because load balancing happens while we have
1892             // all the Capabilities, but even so it's a slightly
1893             // unsavoury invariant.
1894             task->cap = pcap;
1895             context_switch = 1;
1896             waitForReturnCapability(&pcap, task);
1897             if (pcap != &capabilities[i]) {
1898                 barf("scheduleDoGC: got the wrong capability");
1899             }
1900         }
1901     }
1902
1903     waiting_for_gc = rtsFalse;
1904 #endif
1905
1906     /* Kick any transactions which are invalid back to their
1907      * atomically frames.  When next scheduled they will try to
1908      * commit, this commit will fail and they will retry.
1909      */
1910     { 
1911         StgTSO *next;
1912
1913         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1914             if (t->what_next == ThreadRelocated) {
1915                 next = t->link;
1916             } else {
1917                 next = t->global_link;
1918                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1919                     if (!stmValidateNestOfTransactions (t -> trec)) {
1920                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1921                         
1922                         // strip the stack back to the
1923                         // ATOMICALLY_FRAME, aborting the (nested)
1924                         // transaction, and saving the stack of any
1925                         // partially-evaluated thunks on the heap.
1926                         raiseAsync_(cap, t, NULL, rtsTrue);
1927                         
1928 #ifdef REG_R1
1929                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1930 #endif
1931                     }
1932                 }
1933             }
1934         }
1935     }
1936     
1937     // so this happens periodically:
1938     scheduleCheckBlackHoles(cap);
1939     
1940     IF_DEBUG(scheduler, printAllThreads());
1941
1942     /* everybody back, start the GC.
1943      * Could do it in this thread, or signal a condition var
1944      * to do it in another thread.  Either way, we need to
1945      * broadcast on gc_pending_cond afterward.
1946      */
1947 #if defined(THREADED_RTS)
1948     IF_DEBUG(scheduler,sched_belch("doing GC"));
1949 #endif
1950     GarbageCollect(GetRoots, force_major);
1951     
1952 #if defined(SMP)
1953     // release our stash of capabilities.
1954     for (i = 0; i < n_capabilities; i++) {
1955         if (cap != &capabilities[i]) {
1956             task->cap = &capabilities[i];
1957             releaseCapability(&capabilities[i]);
1958         }
1959     }
1960     task->cap = cap;
1961 #endif
1962
1963 #if defined(GRAN)
1964     /* add a ContinueThread event to continue execution of current thread */
1965     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1966               ContinueThread,
1967               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1968     IF_GRAN_DEBUG(bq, 
1969                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1970                   G_EVENTQ(0);
1971                   G_CURR_THREADQ(0));
1972 #endif /* GRAN */
1973 }
1974
1975 /* ---------------------------------------------------------------------------
1976  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1977  * used by Control.Concurrent for error checking.
1978  * ------------------------------------------------------------------------- */
1979  
1980 StgBool
1981 rtsSupportsBoundThreads(void)
1982 {
1983 #if defined(THREADED_RTS)
1984   return rtsTrue;
1985 #else
1986   return rtsFalse;
1987 #endif
1988 }
1989
1990 /* ---------------------------------------------------------------------------
1991  * isThreadBound(tso): check whether tso is bound to an OS thread.
1992  * ------------------------------------------------------------------------- */
1993  
1994 StgBool
1995 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1996 {
1997 #if defined(THREADED_RTS)
1998   return (tso->bound != NULL);
1999 #endif
2000   return rtsFalse;
2001 }
2002
2003 /* ---------------------------------------------------------------------------
2004  * Singleton fork(). Do not copy any running threads.
2005  * ------------------------------------------------------------------------- */
2006
2007 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2008 #define FORKPROCESS_PRIMOP_SUPPORTED
2009 #endif
2010
2011 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2012 static void 
2013 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2014 #endif
2015 StgInt
2016 forkProcess(HsStablePtr *entry
2017 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2018             STG_UNUSED
2019 #endif
2020            )
2021 {
2022 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2023     pid_t pid;
2024     StgTSO* t,*next;
2025     Task *task;
2026     Capability *cap;
2027     
2028     IF_DEBUG(scheduler,sched_belch("forking!"));
2029     
2030     // ToDo: for SMP, we should probably acquire *all* the capabilities
2031     cap = rts_lock();
2032     
2033     pid = fork();
2034     
2035     if (pid) { // parent
2036         
2037         // just return the pid
2038         rts_unlock(cap);
2039         return pid;
2040         
2041     } else { // child
2042         
2043         // delete all threads
2044         cap->run_queue_hd = END_TSO_QUEUE;
2045         cap->run_queue_tl = END_TSO_QUEUE;
2046         
2047         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2048             next = t->link;
2049             
2050             // don't allow threads to catch the ThreadKilled exception
2051             deleteThreadImmediately(cap,t);
2052         }
2053         
2054         // wipe the main thread list
2055         while ((task = all_tasks) != NULL) {
2056             all_tasks = task->all_link;
2057             discardTask(task);
2058         }
2059         
2060         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2061         rts_checkSchedStatus("forkProcess",cap);
2062         
2063         rts_unlock(cap);
2064         hs_exit();                      // clean up and exit
2065         stg_exit(EXIT_SUCCESS);
2066     }
2067 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2068     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2069     return -1;
2070 #endif
2071 }
2072
2073 /* ---------------------------------------------------------------------------
2074  * Delete the threads on the run queue of the current capability.
2075  * ------------------------------------------------------------------------- */
2076    
2077 static void
2078 deleteRunQueue (Capability *cap)
2079 {
2080     StgTSO *t, *next;
2081     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2082         ASSERT(t->what_next != ThreadRelocated);
2083         next = t->link;
2084         deleteThread(cap, t);
2085     }
2086 }
2087
2088 /* startThread and  insertThread are now in GranSim.c -- HWL */
2089
2090
2091 /* -----------------------------------------------------------------------------
2092    Managing the suspended_ccalling_tasks list.
2093    Locks required: sched_mutex
2094    -------------------------------------------------------------------------- */
2095
2096 STATIC_INLINE void
2097 suspendTask (Capability *cap, Task *task)
2098 {
2099     ASSERT(task->next == NULL && task->prev == NULL);
2100     task->next = cap->suspended_ccalling_tasks;
2101     task->prev = NULL;
2102     if (cap->suspended_ccalling_tasks) {
2103         cap->suspended_ccalling_tasks->prev = task;
2104     }
2105     cap->suspended_ccalling_tasks = task;
2106 }
2107
2108 STATIC_INLINE void
2109 recoverSuspendedTask (Capability *cap, Task *task)
2110 {
2111     if (task->prev) {
2112         task->prev->next = task->next;
2113     } else {
2114         ASSERT(cap->suspended_ccalling_tasks == task);
2115         cap->suspended_ccalling_tasks = task->next;
2116     }
2117     if (task->next) {
2118         task->next->prev = task->prev;
2119     }
2120     task->next = task->prev = NULL;
2121 }
2122
2123 /* ---------------------------------------------------------------------------
2124  * Suspending & resuming Haskell threads.
2125  * 
2126  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2127  * its capability before calling the C function.  This allows another
2128  * task to pick up the capability and carry on running Haskell
2129  * threads.  It also means that if the C call blocks, it won't lock
2130  * the whole system.
2131  *
2132  * The Haskell thread making the C call is put to sleep for the
2133  * duration of the call, on the susepended_ccalling_threads queue.  We
2134  * give out a token to the task, which it can use to resume the thread
2135  * on return from the C function.
2136  * ------------------------------------------------------------------------- */
2137    
2138 void *
2139 suspendThread (StgRegTable *reg)
2140 {
2141   Capability *cap;
2142   int saved_errno = errno;
2143   StgTSO *tso;
2144   Task *task;
2145
2146   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2147    */
2148   cap = regTableToCapability(reg);
2149
2150   task = cap->running_task;
2151   tso = cap->r.rCurrentTSO;
2152
2153   IF_DEBUG(scheduler,
2154            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2155
2156   // XXX this might not be necessary --SDM
2157   tso->what_next = ThreadRunGHC;
2158
2159   threadPaused(tso);
2160
2161   if(tso->blocked_exceptions == NULL)  {
2162       tso->why_blocked = BlockedOnCCall;
2163       tso->blocked_exceptions = END_TSO_QUEUE;
2164   } else {
2165       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2166   }
2167
2168   // Hand back capability
2169   task->suspended_tso = tso;
2170
2171   ACQUIRE_LOCK(&cap->lock);
2172
2173   suspendTask(cap,task);
2174   cap->in_haskell = rtsFalse;
2175   releaseCapability_(cap);
2176   
2177   RELEASE_LOCK(&cap->lock);
2178
2179 #if defined(THREADED_RTS)
2180   /* Preparing to leave the RTS, so ensure there's a native thread/task
2181      waiting to take over.
2182   */
2183   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2184 #endif
2185
2186   errno = saved_errno;
2187   return task;
2188 }
2189
2190 StgRegTable *
2191 resumeThread (void *task_)
2192 {
2193     StgTSO *tso;
2194     Capability *cap;
2195     int saved_errno = errno;
2196     Task *task = task_;
2197
2198     cap = task->cap;
2199     // Wait for permission to re-enter the RTS with the result.
2200     waitForReturnCapability(&cap,task);
2201     // we might be on a different capability now... but if so, our
2202     // entry on the suspended_ccalling_tasks list will also have been
2203     // migrated.
2204
2205     // Remove the thread from the suspended list
2206     recoverSuspendedTask(cap,task);
2207
2208     tso = task->suspended_tso;
2209     task->suspended_tso = NULL;
2210     tso->link = END_TSO_QUEUE;
2211     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2212     
2213     if (tso->why_blocked == BlockedOnCCall) {
2214         awakenBlockedQueue(cap,tso->blocked_exceptions);
2215         tso->blocked_exceptions = NULL;
2216     }
2217     
2218     /* Reset blocking status */
2219     tso->why_blocked  = NotBlocked;
2220     
2221     cap->r.rCurrentTSO = tso;
2222     cap->in_haskell = rtsTrue;
2223     errno = saved_errno;
2224
2225     return &cap->r;
2226 }
2227
2228 /* ---------------------------------------------------------------------------
2229  * Comparing Thread ids.
2230  *
2231  * This is used from STG land in the implementation of the
2232  * instances of Eq/Ord for ThreadIds.
2233  * ------------------------------------------------------------------------ */
2234
2235 int
2236 cmp_thread(StgPtr tso1, StgPtr tso2) 
2237
2238   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2239   StgThreadID id2 = ((StgTSO *)tso2)->id;
2240  
2241   if (id1 < id2) return (-1);
2242   if (id1 > id2) return 1;
2243   return 0;
2244 }
2245
2246 /* ---------------------------------------------------------------------------
2247  * Fetching the ThreadID from an StgTSO.
2248  *
2249  * This is used in the implementation of Show for ThreadIds.
2250  * ------------------------------------------------------------------------ */
2251 int
2252 rts_getThreadId(StgPtr tso) 
2253 {
2254   return ((StgTSO *)tso)->id;
2255 }
2256
2257 #ifdef DEBUG
2258 void
2259 labelThread(StgPtr tso, char *label)
2260 {
2261   int len;
2262   void *buf;
2263
2264   /* Caveat: Once set, you can only set the thread name to "" */
2265   len = strlen(label)+1;
2266   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2267   strncpy(buf,label,len);
2268   /* Update will free the old memory for us */
2269   updateThreadLabel(((StgTSO *)tso)->id,buf);
2270 }
2271 #endif /* DEBUG */
2272
2273 /* ---------------------------------------------------------------------------
2274    Create a new thread.
2275
2276    The new thread starts with the given stack size.  Before the
2277    scheduler can run, however, this thread needs to have a closure
2278    (and possibly some arguments) pushed on its stack.  See
2279    pushClosure() in Schedule.h.
2280
2281    createGenThread() and createIOThread() (in SchedAPI.h) are
2282    convenient packaged versions of this function.
2283
2284    currently pri (priority) is only used in a GRAN setup -- HWL
2285    ------------------------------------------------------------------------ */
2286 #if defined(GRAN)
2287 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2288 StgTSO *
2289 createThread(nat size, StgInt pri)
2290 #else
2291 StgTSO *
2292 createThread(Capability *cap, nat size)
2293 #endif
2294 {
2295     StgTSO *tso;
2296     nat stack_size;
2297
2298     /* sched_mutex is *not* required */
2299
2300     /* First check whether we should create a thread at all */
2301 #if defined(PARALLEL_HASKELL)
2302     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2303     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2304         threadsIgnored++;
2305         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2306                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2307         return END_TSO_QUEUE;
2308     }
2309     threadsCreated++;
2310 #endif
2311
2312 #if defined(GRAN)
2313     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2314 #endif
2315
2316     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2317
2318     /* catch ridiculously small stack sizes */
2319     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2320         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2321     }
2322
2323     stack_size = size - TSO_STRUCT_SIZEW;
2324     
2325     tso = (StgTSO *)allocateLocal(cap, size);
2326     TICK_ALLOC_TSO(stack_size, 0);
2327
2328     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2329 #if defined(GRAN)
2330     SET_GRAN_HDR(tso, ThisPE);
2331 #endif
2332
2333     // Always start with the compiled code evaluator
2334     tso->what_next = ThreadRunGHC;
2335
2336     tso->why_blocked  = NotBlocked;
2337     tso->blocked_exceptions = NULL;
2338     
2339     tso->saved_errno = 0;
2340     tso->bound = NULL;
2341     
2342     tso->stack_size     = stack_size;
2343     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2344                           - TSO_STRUCT_SIZEW;
2345     tso->sp             = (P_)&(tso->stack) + stack_size;
2346
2347     tso->trec = NO_TREC;
2348     
2349 #ifdef PROFILING
2350     tso->prof.CCCS = CCS_MAIN;
2351 #endif
2352     
2353   /* put a stop frame on the stack */
2354     tso->sp -= sizeofW(StgStopFrame);
2355     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2356     tso->link = END_TSO_QUEUE;
2357     
2358   // ToDo: check this
2359 #if defined(GRAN)
2360     /* uses more flexible routine in GranSim */
2361     insertThread(tso, CurrentProc);
2362 #else
2363     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2364      * from its creation
2365      */
2366 #endif
2367     
2368 #if defined(GRAN) 
2369     if (RtsFlags.GranFlags.GranSimStats.Full) 
2370         DumpGranEvent(GR_START,tso);
2371 #elif defined(PARALLEL_HASKELL)
2372     if (RtsFlags.ParFlags.ParStats.Full) 
2373         DumpGranEvent(GR_STARTQ,tso);
2374     /* HACk to avoid SCHEDULE 
2375        LastTSO = tso; */
2376 #endif
2377     
2378     /* Link the new thread on the global thread list.
2379      */
2380     ACQUIRE_LOCK(&sched_mutex);
2381     tso->id = next_thread_id++;  // while we have the mutex
2382     tso->global_link = all_threads;
2383     all_threads = tso;
2384     RELEASE_LOCK(&sched_mutex);
2385     
2386 #if defined(DIST)
2387     tso->dist.priority = MandatoryPriority; //by default that is...
2388 #endif
2389     
2390 #if defined(GRAN)
2391     tso->gran.pri = pri;
2392 # if defined(DEBUG)
2393     tso->gran.magic = TSO_MAGIC; // debugging only
2394 # endif
2395     tso->gran.sparkname   = 0;
2396     tso->gran.startedat   = CURRENT_TIME; 
2397     tso->gran.exported    = 0;
2398     tso->gran.basicblocks = 0;
2399     tso->gran.allocs      = 0;
2400     tso->gran.exectime    = 0;
2401     tso->gran.fetchtime   = 0;
2402     tso->gran.fetchcount  = 0;
2403     tso->gran.blocktime   = 0;
2404     tso->gran.blockcount  = 0;
2405     tso->gran.blockedat   = 0;
2406     tso->gran.globalsparks = 0;
2407     tso->gran.localsparks  = 0;
2408     if (RtsFlags.GranFlags.Light)
2409         tso->gran.clock  = Now; /* local clock */
2410     else
2411         tso->gran.clock  = 0;
2412     
2413     IF_DEBUG(gran,printTSO(tso));
2414 #elif defined(PARALLEL_HASKELL)
2415 # if defined(DEBUG)
2416     tso->par.magic = TSO_MAGIC; // debugging only
2417 # endif
2418     tso->par.sparkname   = 0;
2419     tso->par.startedat   = CURRENT_TIME; 
2420     tso->par.exported    = 0;
2421     tso->par.basicblocks = 0;
2422     tso->par.allocs      = 0;
2423     tso->par.exectime    = 0;
2424     tso->par.fetchtime   = 0;
2425     tso->par.fetchcount  = 0;
2426     tso->par.blocktime   = 0;
2427     tso->par.blockcount  = 0;
2428     tso->par.blockedat   = 0;
2429     tso->par.globalsparks = 0;
2430     tso->par.localsparks  = 0;
2431 #endif
2432     
2433 #if defined(GRAN)
2434     globalGranStats.tot_threads_created++;
2435     globalGranStats.threads_created_on_PE[CurrentProc]++;
2436     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2437     globalGranStats.tot_sq_probes++;
2438 #elif defined(PARALLEL_HASKELL)
2439     // collect parallel global statistics (currently done together with GC stats)
2440     if (RtsFlags.ParFlags.ParStats.Global &&
2441         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2442         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2443         globalParStats.tot_threads_created++;
2444     }
2445 #endif 
2446     
2447 #if defined(GRAN)
2448     IF_GRAN_DEBUG(pri,
2449                   sched_belch("==__ schedule: Created TSO %d (%p);",
2450                               CurrentProc, tso, tso->id));
2451 #elif defined(PARALLEL_HASKELL)
2452     IF_PAR_DEBUG(verbose,
2453                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2454                              (long)tso->id, tso, advisory_thread_count));
2455 #else
2456     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2457                                    (long)tso->id, (long)tso->stack_size));
2458 #endif    
2459     return tso;
2460 }
2461
2462 #if defined(PAR)
2463 /* RFP:
2464    all parallel thread creation calls should fall through the following routine.
2465 */
2466 StgTSO *
2467 createThreadFromSpark(rtsSpark spark) 
2468 { StgTSO *tso;
2469   ASSERT(spark != (rtsSpark)NULL);
2470 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2471   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2472   { threadsIgnored++;
2473     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2474           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2475     return END_TSO_QUEUE;
2476   }
2477   else
2478   { threadsCreated++;
2479     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2480     if (tso==END_TSO_QUEUE)     
2481       barf("createSparkThread: Cannot create TSO");
2482 #if defined(DIST)
2483     tso->priority = AdvisoryPriority;
2484 #endif
2485     pushClosure(tso,spark);
2486     addToRunQueue(tso);
2487     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2488   }
2489   return tso;
2490 }
2491 #endif
2492
2493 /*
2494   Turn a spark into a thread.
2495   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2496 */
2497 #if 0
2498 StgTSO *
2499 activateSpark (rtsSpark spark) 
2500 {
2501   StgTSO *tso;
2502
2503   tso = createSparkThread(spark);
2504   if (RtsFlags.ParFlags.ParStats.Full) {   
2505     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2506       IF_PAR_DEBUG(verbose,
2507                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2508                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2509   }
2510   // ToDo: fwd info on local/global spark to thread -- HWL
2511   // tso->gran.exported =  spark->exported;
2512   // tso->gran.locked =   !spark->global;
2513   // tso->gran.sparkname = spark->name;
2514
2515   return tso;
2516 }
2517 #endif
2518
2519 /* ---------------------------------------------------------------------------
2520  * scheduleThread()
2521  *
2522  * scheduleThread puts a thread on the end  of the runnable queue.
2523  * This will usually be done immediately after a thread is created.
2524  * The caller of scheduleThread must create the thread using e.g.
2525  * createThread and push an appropriate closure
2526  * on this thread's stack before the scheduler is invoked.
2527  * ------------------------------------------------------------------------ */
2528
2529 void
2530 scheduleThread(Capability *cap, StgTSO *tso)
2531 {
2532     // The thread goes at the *end* of the run-queue, to avoid possible
2533     // starvation of any threads already on the queue.
2534     appendToRunQueue(cap,tso);
2535 }
2536
2537 Capability *
2538 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2539 {
2540     Task *task;
2541
2542     // We already created/initialised the Task
2543     task = cap->running_task;
2544
2545     // This TSO is now a bound thread; make the Task and TSO
2546     // point to each other.
2547     tso->bound = task;
2548
2549     task->tso = tso;
2550     task->ret = ret;
2551     task->stat = NoStatus;
2552
2553     appendToRunQueue(cap,tso);
2554
2555     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2556
2557 #if defined(GRAN)
2558     /* GranSim specific init */
2559     CurrentTSO = m->tso;                // the TSO to run
2560     procStatus[MainProc] = Busy;        // status of main PE
2561     CurrentProc = MainProc;             // PE to run it on
2562 #endif
2563
2564     cap = schedule(cap,task);
2565
2566     ASSERT(task->stat != NoStatus);
2567     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2568
2569     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2570     return cap;
2571 }
2572
2573 /* ----------------------------------------------------------------------------
2574  * Starting Tasks
2575  * ------------------------------------------------------------------------- */
2576
2577 #if defined(THREADED_RTS)
2578 void
2579 workerStart(Task *task)
2580 {
2581     Capability *cap;
2582
2583     // See startWorkerTask().
2584     ACQUIRE_LOCK(&task->lock);
2585     cap = task->cap;
2586     RELEASE_LOCK(&task->lock);
2587
2588     // set the thread-local pointer to the Task:
2589     taskEnter(task);
2590
2591     // schedule() runs without a lock.
2592     cap = schedule(cap,task);
2593
2594     // On exit from schedule(), we have a Capability.
2595     releaseCapability(cap);
2596     taskStop(task);
2597 }
2598 #endif
2599
2600 /* ---------------------------------------------------------------------------
2601  * initScheduler()
2602  *
2603  * Initialise the scheduler.  This resets all the queues - if the
2604  * queues contained any threads, they'll be garbage collected at the
2605  * next pass.
2606  *
2607  * ------------------------------------------------------------------------ */
2608
2609 void 
2610 initScheduler(void)
2611 {
2612 #if defined(GRAN)
2613   nat i;
2614   for (i=0; i<=MAX_PROC; i++) {
2615     run_queue_hds[i]      = END_TSO_QUEUE;
2616     run_queue_tls[i]      = END_TSO_QUEUE;
2617     blocked_queue_hds[i]  = END_TSO_QUEUE;
2618     blocked_queue_tls[i]  = END_TSO_QUEUE;
2619     ccalling_threadss[i]  = END_TSO_QUEUE;
2620     blackhole_queue[i]    = END_TSO_QUEUE;
2621     sleeping_queue        = END_TSO_QUEUE;
2622   }
2623 #elif !defined(THREADED_RTS)
2624   blocked_queue_hd  = END_TSO_QUEUE;
2625   blocked_queue_tl  = END_TSO_QUEUE;
2626   sleeping_queue    = END_TSO_QUEUE;
2627 #endif
2628
2629   blackhole_queue   = END_TSO_QUEUE;
2630   all_threads       = END_TSO_QUEUE;
2631
2632   context_switch = 0;
2633   interrupted    = 0;
2634
2635   RtsFlags.ConcFlags.ctxtSwitchTicks =
2636       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2637       
2638 #if defined(THREADED_RTS)
2639   /* Initialise the mutex and condition variables used by
2640    * the scheduler. */
2641   initMutex(&sched_mutex);
2642 #endif
2643   
2644   ACQUIRE_LOCK(&sched_mutex);
2645
2646   /* A capability holds the state a native thread needs in
2647    * order to execute STG code. At least one capability is
2648    * floating around (only SMP builds have more than one).
2649    */
2650   initCapabilities();
2651
2652   initTaskManager();
2653
2654 #if defined(SMP)
2655   /*
2656    * Eagerly start one worker to run each Capability, except for
2657    * Capability 0.  The idea is that we're probably going to start a
2658    * bound thread on Capability 0 pretty soon, so we don't want a
2659    * worker task hogging it.
2660    */
2661   { 
2662       nat i;
2663       Capability *cap;
2664       for (i = 1; i < n_capabilities; i++) {
2665           cap = &capabilities[i];
2666           ACQUIRE_LOCK(&cap->lock);
2667           startWorkerTask(cap, workerStart);
2668           RELEASE_LOCK(&cap->lock);
2669       }
2670   }
2671 #endif
2672
2673 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2674   initSparkPools();
2675 #endif
2676
2677   RELEASE_LOCK(&sched_mutex);
2678 }
2679
2680 void
2681 exitScheduler( void )
2682 {
2683     interrupted = rtsTrue;
2684     shutting_down_scheduler = rtsTrue;
2685
2686 #if defined(THREADED_RTS)
2687     { 
2688         Task *task;
2689         nat i;
2690         
2691         ACQUIRE_LOCK(&sched_mutex);
2692         task = newBoundTask();
2693         RELEASE_LOCK(&sched_mutex);
2694
2695         for (i = 0; i < n_capabilities; i++) {
2696             shutdownCapability(&capabilities[i], task);
2697         }
2698         boundTaskExiting(task);
2699         stopTaskManager();
2700     }
2701 #endif
2702 }
2703
2704 /* ---------------------------------------------------------------------------
2705    Where are the roots that we know about?
2706
2707         - all the threads on the runnable queue
2708         - all the threads on the blocked queue
2709         - all the threads on the sleeping queue
2710         - all the thread currently executing a _ccall_GC
2711         - all the "main threads"
2712      
2713    ------------------------------------------------------------------------ */
2714
2715 /* This has to be protected either by the scheduler monitor, or by the
2716         garbage collection monitor (probably the latter).
2717         KH @ 25/10/99
2718 */
2719
2720 void
2721 GetRoots( evac_fn evac )
2722 {
2723     nat i;
2724     Capability *cap;
2725     Task *task;
2726
2727 #if defined(GRAN)
2728     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2729         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2730             evac((StgClosure **)&run_queue_hds[i]);
2731         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2732             evac((StgClosure **)&run_queue_tls[i]);
2733         
2734         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2735             evac((StgClosure **)&blocked_queue_hds[i]);
2736         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2737             evac((StgClosure **)&blocked_queue_tls[i]);
2738         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2739             evac((StgClosure **)&ccalling_threads[i]);
2740     }
2741
2742     markEventQueue();
2743
2744 #else /* !GRAN */
2745
2746     for (i = 0; i < n_capabilities; i++) {
2747         cap = &capabilities[i];
2748         evac((StgClosure **)&cap->run_queue_hd);
2749         evac((StgClosure **)&cap->run_queue_tl);
2750         
2751         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2752              task=task->next) {
2753             evac((StgClosure **)&task->suspended_tso);
2754         }
2755     }
2756     
2757 #if !defined(THREADED_RTS)
2758     evac((StgClosure **)&blocked_queue_hd);
2759     evac((StgClosure **)&blocked_queue_tl);
2760     evac((StgClosure **)&sleeping_queue);
2761 #endif 
2762 #endif
2763
2764     evac((StgClosure **)&blackhole_queue);
2765
2766 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2767     markSparkQueue(evac);
2768 #endif
2769     
2770 #if defined(RTS_USER_SIGNALS)
2771     // mark the signal handlers (signals should be already blocked)
2772     markSignalHandlers(evac);
2773 #endif
2774 }
2775
2776 /* -----------------------------------------------------------------------------
2777    performGC
2778
2779    This is the interface to the garbage collector from Haskell land.
2780    We provide this so that external C code can allocate and garbage
2781    collect when called from Haskell via _ccall_GC.
2782
2783    It might be useful to provide an interface whereby the programmer
2784    can specify more roots (ToDo).
2785    
2786    This needs to be protected by the GC condition variable above.  KH.
2787    -------------------------------------------------------------------------- */
2788
2789 static void (*extra_roots)(evac_fn);
2790
2791 void
2792 performGC(void)
2793 {
2794 #ifdef THREADED_RTS
2795     // ToDo: we have to grab all the capabilities here.
2796     errorBelch("performGC not supported in threaded RTS (yet)");
2797     stg_exit(EXIT_FAILURE);
2798 #endif
2799     /* Obligated to hold this lock upon entry */
2800     GarbageCollect(GetRoots,rtsFalse);
2801 }
2802
2803 void
2804 performMajorGC(void)
2805 {
2806 #ifdef THREADED_RTS
2807     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2808     stg_exit(EXIT_FAILURE);
2809 #endif
2810     GarbageCollect(GetRoots,rtsTrue);
2811 }
2812
2813 static void
2814 AllRoots(evac_fn evac)
2815 {
2816     GetRoots(evac);             // the scheduler's roots
2817     extra_roots(evac);          // the user's roots
2818 }
2819
2820 void
2821 performGCWithRoots(void (*get_roots)(evac_fn))
2822 {
2823 #ifdef THREADED_RTS
2824     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2825     stg_exit(EXIT_FAILURE);
2826 #endif
2827     extra_roots = get_roots;
2828     GarbageCollect(AllRoots,rtsFalse);
2829 }
2830
2831 /* -----------------------------------------------------------------------------
2832    Stack overflow
2833
2834    If the thread has reached its maximum stack size, then raise the
2835    StackOverflow exception in the offending thread.  Otherwise
2836    relocate the TSO into a larger chunk of memory and adjust its stack
2837    size appropriately.
2838    -------------------------------------------------------------------------- */
2839
2840 static StgTSO *
2841 threadStackOverflow(Capability *cap, StgTSO *tso)
2842 {
2843   nat new_stack_size, stack_words;
2844   lnat new_tso_size;
2845   StgPtr new_sp;
2846   StgTSO *dest;
2847
2848   IF_DEBUG(sanity,checkTSO(tso));
2849   if (tso->stack_size >= tso->max_stack_size) {
2850
2851     IF_DEBUG(gc,
2852              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2853                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2854              /* If we're debugging, just print out the top of the stack */
2855              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2856                                               tso->sp+64)));
2857
2858     /* Send this thread the StackOverflow exception */
2859     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2860     return tso;
2861   }
2862
2863   /* Try to double the current stack size.  If that takes us over the
2864    * maximum stack size for this thread, then use the maximum instead.
2865    * Finally round up so the TSO ends up as a whole number of blocks.
2866    */
2867   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2868   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2869                                        TSO_STRUCT_SIZE)/sizeof(W_);
2870   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2871   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2872
2873   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2874
2875   dest = (StgTSO *)allocate(new_tso_size);
2876   TICK_ALLOC_TSO(new_stack_size,0);
2877
2878   /* copy the TSO block and the old stack into the new area */
2879   memcpy(dest,tso,TSO_STRUCT_SIZE);
2880   stack_words = tso->stack + tso->stack_size - tso->sp;
2881   new_sp = (P_)dest + new_tso_size - stack_words;
2882   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2883
2884   /* relocate the stack pointers... */
2885   dest->sp         = new_sp;
2886   dest->stack_size = new_stack_size;
2887         
2888   /* Mark the old TSO as relocated.  We have to check for relocated
2889    * TSOs in the garbage collector and any primops that deal with TSOs.
2890    *
2891    * It's important to set the sp value to just beyond the end
2892    * of the stack, so we don't attempt to scavenge any part of the
2893    * dead TSO's stack.
2894    */
2895   tso->what_next = ThreadRelocated;
2896   tso->link = dest;
2897   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2898   tso->why_blocked = NotBlocked;
2899
2900   IF_PAR_DEBUG(verbose,
2901                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2902                      tso->id, tso, tso->stack_size);
2903                /* If we're debugging, just print out the top of the stack */
2904                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2905                                                 tso->sp+64)));
2906   
2907   IF_DEBUG(sanity,checkTSO(tso));
2908 #if 0
2909   IF_DEBUG(scheduler,printTSO(dest));
2910 #endif
2911
2912   return dest;
2913 }
2914
2915 /* ---------------------------------------------------------------------------
2916    Wake up a queue that was blocked on some resource.
2917    ------------------------------------------------------------------------ */
2918
2919 #if defined(GRAN)
2920 STATIC_INLINE void
2921 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2922 {
2923 }
2924 #elif defined(PARALLEL_HASKELL)
2925 STATIC_INLINE void
2926 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2927 {
2928   /* write RESUME events to log file and
2929      update blocked and fetch time (depending on type of the orig closure) */
2930   if (RtsFlags.ParFlags.ParStats.Full) {
2931     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2932                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2933                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2934     if (emptyRunQueue())
2935       emitSchedule = rtsTrue;
2936
2937     switch (get_itbl(node)->type) {
2938         case FETCH_ME_BQ:
2939           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2940           break;
2941         case RBH:
2942         case FETCH_ME:
2943         case BLACKHOLE_BQ:
2944           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2945           break;
2946 #ifdef DIST
2947         case MVAR:
2948           break;
2949 #endif    
2950         default:
2951           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2952         }
2953       }
2954 }
2955 #endif
2956
2957 #if defined(GRAN)
2958 StgBlockingQueueElement *
2959 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2960 {
2961     StgTSO *tso;
2962     PEs node_loc, tso_loc;
2963
2964     node_loc = where_is(node); // should be lifted out of loop
2965     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2966     tso_loc = where_is((StgClosure *)tso);
2967     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2968       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2969       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2970       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2971       // insertThread(tso, node_loc);
2972       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2973                 ResumeThread,
2974                 tso, node, (rtsSpark*)NULL);
2975       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2976       // len_local++;
2977       // len++;
2978     } else { // TSO is remote (actually should be FMBQ)
2979       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2980                                   RtsFlags.GranFlags.Costs.gunblocktime +
2981                                   RtsFlags.GranFlags.Costs.latency;
2982       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2983                 UnblockThread,
2984                 tso, node, (rtsSpark*)NULL);
2985       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2986       // len++;
2987     }
2988     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2989     IF_GRAN_DEBUG(bq,
2990                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2991                           (node_loc==tso_loc ? "Local" : "Global"), 
2992                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2993     tso->block_info.closure = NULL;
2994     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2995                              tso->id, tso));
2996 }
2997 #elif defined(PARALLEL_HASKELL)
2998 StgBlockingQueueElement *
2999 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3000 {
3001     StgBlockingQueueElement *next;
3002
3003     switch (get_itbl(bqe)->type) {
3004     case TSO:
3005       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3006       /* if it's a TSO just push it onto the run_queue */
3007       next = bqe->link;
3008       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3009       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3010       threadRunnable();
3011       unblockCount(bqe, node);
3012       /* reset blocking status after dumping event */
3013       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3014       break;
3015
3016     case BLOCKED_FETCH:
3017       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3018       next = bqe->link;
3019       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3020       PendingFetches = (StgBlockedFetch *)bqe;
3021       break;
3022
3023 # if defined(DEBUG)
3024       /* can ignore this case in a non-debugging setup; 
3025          see comments on RBHSave closures above */
3026     case CONSTR:
3027       /* check that the closure is an RBHSave closure */
3028       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3029              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3030              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3031       break;
3032
3033     default:
3034       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3035            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3036            (StgClosure *)bqe);
3037 # endif
3038     }
3039   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3040   return next;
3041 }
3042 #endif
3043
3044 StgTSO *
3045 unblockOne(Capability *cap, StgTSO *tso)
3046 {
3047   StgTSO *next;
3048
3049   ASSERT(get_itbl(tso)->type == TSO);
3050   ASSERT(tso->why_blocked != NotBlocked);
3051   tso->why_blocked = NotBlocked;
3052   next = tso->link;
3053   tso->link = END_TSO_QUEUE;
3054
3055   // We might have just migrated this TSO to our Capability:
3056   if (tso->bound) {
3057       tso->bound->cap = cap;
3058   }
3059
3060   appendToRunQueue(cap,tso);
3061
3062   // we're holding a newly woken thread, make sure we context switch
3063   // quickly so we can migrate it if necessary.
3064   context_switch = 1;
3065   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3066   return next;
3067 }
3068
3069
3070 #if defined(GRAN)
3071 void 
3072 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3073 {
3074   StgBlockingQueueElement *bqe;
3075   PEs node_loc;
3076   nat len = 0; 
3077
3078   IF_GRAN_DEBUG(bq, 
3079                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3080                       node, CurrentProc, CurrentTime[CurrentProc], 
3081                       CurrentTSO->id, CurrentTSO));
3082
3083   node_loc = where_is(node);
3084
3085   ASSERT(q == END_BQ_QUEUE ||
3086          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3087          get_itbl(q)->type == CONSTR); // closure (type constructor)
3088   ASSERT(is_unique(node));
3089
3090   /* FAKE FETCH: magically copy the node to the tso's proc;
3091      no Fetch necessary because in reality the node should not have been 
3092      moved to the other PE in the first place
3093   */
3094   if (CurrentProc!=node_loc) {
3095     IF_GRAN_DEBUG(bq, 
3096                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3097                         node, node_loc, CurrentProc, CurrentTSO->id, 
3098                         // CurrentTSO, where_is(CurrentTSO),
3099                         node->header.gran.procs));
3100     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3101     IF_GRAN_DEBUG(bq, 
3102                   debugBelch("## new bitmask of node %p is %#x\n",
3103                         node, node->header.gran.procs));
3104     if (RtsFlags.GranFlags.GranSimStats.Global) {
3105       globalGranStats.tot_fake_fetches++;
3106     }
3107   }
3108
3109   bqe = q;
3110   // ToDo: check: ASSERT(CurrentProc==node_loc);
3111   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3112     //next = bqe->link;
3113     /* 
3114        bqe points to the current element in the queue
3115        next points to the next element in the queue
3116     */
3117     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3118     //tso_loc = where_is(tso);
3119     len++;
3120     bqe = unblockOne(bqe, node);
3121   }
3122
3123   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3124      the closure to make room for the anchor of the BQ */
3125   if (bqe!=END_BQ_QUEUE) {
3126     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3127     /*
3128     ASSERT((info_ptr==&RBH_Save_0_info) ||
3129            (info_ptr==&RBH_Save_1_info) ||
3130            (info_ptr==&RBH_Save_2_info));
3131     */
3132     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3133     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3134     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3135
3136     IF_GRAN_DEBUG(bq,
3137                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3138                         node, info_type(node)));
3139   }
3140
3141   /* statistics gathering */
3142   if (RtsFlags.GranFlags.GranSimStats.Global) {
3143     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3144     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3145     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3146     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3147   }
3148   IF_GRAN_DEBUG(bq,
3149                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3150                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3151 }
3152 #elif defined(PARALLEL_HASKELL)
3153 void 
3154 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3155 {
3156   StgBlockingQueueElement *bqe;
3157
3158   IF_PAR_DEBUG(verbose, 
3159                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3160                      node, mytid));
3161 #ifdef DIST  
3162   //RFP
3163   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3164     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3165     return;
3166   }
3167 #endif
3168   
3169   ASSERT(q == END_BQ_QUEUE ||
3170          get_itbl(q)->type == TSO ||           
3171          get_itbl(q)->type == BLOCKED_FETCH || 
3172          get_itbl(q)->type == CONSTR); 
3173
3174   bqe = q;
3175   while (get_itbl(bqe)->type==TSO || 
3176          get_itbl(bqe)->type==BLOCKED_FETCH) {
3177     bqe = unblockOne(bqe, node);
3178   }
3179 }
3180
3181 #else   /* !GRAN && !PARALLEL_HASKELL */
3182
3183 void
3184 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3185 {
3186     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3187                              // Exception.cmm
3188     while (tso != END_TSO_QUEUE) {
3189         tso = unblockOne(cap,tso);
3190     }
3191 }
3192 #endif
3193
3194 /* ---------------------------------------------------------------------------
3195    Interrupt execution
3196    - usually called inside a signal handler so it mustn't do anything fancy.   
3197    ------------------------------------------------------------------------ */
3198
3199 void
3200 interruptStgRts(void)
3201 {
3202     interrupted    = 1;
3203     context_switch = 1;
3204 #if defined(THREADED_RTS)
3205     prodAllCapabilities();
3206 #endif
3207 }
3208
3209 /* -----------------------------------------------------------------------------
3210    Unblock a thread
3211
3212    This is for use when we raise an exception in another thread, which
3213    may be blocked.
3214    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3215    -------------------------------------------------------------------------- */
3216
3217 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3218 /*
3219   NB: only the type of the blocking queue is different in GranSim and GUM
3220       the operations on the queue-elements are the same
3221       long live polymorphism!
3222
3223   Locks: sched_mutex is held upon entry and exit.
3224
3225 */
3226 static void
3227 unblockThread(Capability *cap, StgTSO *tso)
3228 {
3229   StgBlockingQueueElement *t, **last;
3230
3231   switch (tso->why_blocked) {
3232
3233   case NotBlocked:
3234     return;  /* not blocked */
3235
3236   case BlockedOnSTM:
3237     // Be careful: nothing to do here!  We tell the scheduler that the thread
3238     // is runnable and we leave it to the stack-walking code to abort the 
3239     // transaction while unwinding the stack.  We should perhaps have a debugging
3240     // test to make sure that this really happens and that the 'zombie' transaction
3241     // does not get committed.
3242     goto done;
3243
3244   case BlockedOnMVar:
3245     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3246     {
3247       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3248       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3249
3250       last = (StgBlockingQueueElement **)&mvar->head;
3251       for (t = (StgBlockingQueueElement *)mvar->head; 
3252            t != END_BQ_QUEUE; 
3253            last = &t->link, last_tso = t, t = t->link) {
3254         if (t == (StgBlockingQueueElement *)tso) {
3255           *last = (StgBlockingQueueElement *)tso->link;
3256           if (mvar->tail == tso) {
3257             mvar->tail = (StgTSO *)last_tso;
3258           }
3259           goto done;
3260         }
3261       }
3262       barf("unblockThread (MVAR): TSO not found");
3263     }
3264
3265   case BlockedOnBlackHole:
3266     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3267     {
3268       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3269
3270       last = &bq->blocking_queue;
3271       for (t = bq->blocking_queue; 
3272            t != END_BQ_QUEUE; 
3273            last = &t->link, t = t->link) {
3274         if (t == (StgBlockingQueueElement *)tso) {
3275           *last = (StgBlockingQueueElement *)tso->link;
3276           goto done;
3277         }
3278       }
3279       barf("unblockThread (BLACKHOLE): TSO not found");
3280     }
3281
3282   case BlockedOnException:
3283     {
3284       StgTSO *target  = tso->block_info.tso;
3285
3286       ASSERT(get_itbl(target)->type == TSO);
3287
3288       if (target->what_next == ThreadRelocated) {
3289           target = target->link;
3290           ASSERT(get_itbl(target)->type == TSO);
3291       }
3292
3293       ASSERT(target->blocked_exceptions != NULL);
3294
3295       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3296       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3297            t != END_BQ_QUEUE; 
3298            last = &t->link, t = t->link) {
3299         ASSERT(get_itbl(t)->type == TSO);
3300         if (t == (StgBlockingQueueElement *)tso) {
3301           *last = (StgBlockingQueueElement *)tso->link;
3302           goto done;
3303         }
3304       }
3305       barf("unblockThread (Exception): TSO not found");
3306     }
3307
3308   case BlockedOnRead:
3309   case BlockedOnWrite:
3310 #if defined(mingw32_HOST_OS)
3311   case BlockedOnDoProc:
3312 #endif
3313     {
3314       /* take TSO off blocked_queue */
3315       StgBlockingQueueElement *prev = NULL;
3316       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3317            prev = t, t = t->link) {
3318         if (t == (StgBlockingQueueElement *)tso) {
3319           if (prev == NULL) {
3320             blocked_queue_hd = (StgTSO *)t->link;
3321             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3322               blocked_queue_tl = END_TSO_QUEUE;
3323             }
3324           } else {
3325             prev->link = t->link;
3326             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3327               blocked_queue_tl = (StgTSO *)prev;
3328             }
3329           }
3330 #if defined(mingw32_HOST_OS)
3331           /* (Cooperatively) signal that the worker thread should abort
3332            * the request.
3333            */
3334           abandonWorkRequest(tso->block_info.async_result->reqID);
3335 #endif
3336           goto done;
3337         }
3338       }
3339       barf("unblockThread (I/O): TSO not found");
3340     }
3341
3342   case BlockedOnDelay:
3343     {
3344       /* take TSO off sleeping_queue */
3345       StgBlockingQueueElement *prev = NULL;
3346       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3347            prev = t, t = t->link) {
3348         if (t == (StgBlockingQueueElement *)tso) {
3349           if (prev == NULL) {
3350             sleeping_queue = (StgTSO *)t->link;
3351           } else {
3352             prev->link = t->link;
3353           }
3354           goto done;
3355         }
3356       }
3357       barf("unblockThread (delay): TSO not found");
3358     }
3359
3360   default:
3361     barf("unblockThread");
3362   }
3363
3364  done:
3365   tso->link = END_TSO_QUEUE;
3366   tso->why_blocked = NotBlocked;
3367   tso->block_info.closure = NULL;
3368   pushOnRunQueue(cap,tso);
3369 }
3370 #else
3371 static void
3372 unblockThread(Capability *cap, StgTSO *tso)
3373 {
3374   StgTSO *t, **last;
3375   
3376   /* To avoid locking unnecessarily. */
3377   if (tso->why_blocked == NotBlocked) {
3378     return;
3379   }
3380
3381   switch (tso->why_blocked) {
3382
3383   case BlockedOnSTM:
3384     // Be careful: nothing to do here!  We tell the scheduler that the thread
3385     // is runnable and we leave it to the stack-walking code to abort the 
3386     // transaction while unwinding the stack.  We should perhaps have a debugging
3387     // test to make sure that this really happens and that the 'zombie' transaction
3388     // does not get committed.
3389     goto done;
3390
3391   case BlockedOnMVar:
3392     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3393     {
3394       StgTSO *last_tso = END_TSO_QUEUE;
3395       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3396
3397       last = &mvar->head;
3398       for (t = mvar->head; t != END_TSO_QUEUE; 
3399            last = &t->link, last_tso = t, t = t->link) {
3400         if (t == tso) {
3401           *last = tso->link;
3402           if (mvar->tail == tso) {
3403             mvar->tail = last_tso;
3404           }
3405           goto done;
3406         }
3407       }
3408       barf("unblockThread (MVAR): TSO not found");
3409     }
3410
3411   case BlockedOnBlackHole:
3412     {
3413       last = &blackhole_queue;
3414       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3415            last = &t->link, t = t->link) {
3416         if (t == tso) {
3417           *last = tso->link;
3418           goto done;
3419         }
3420       }
3421       barf("unblockThread (BLACKHOLE): TSO not found");
3422     }
3423
3424   case BlockedOnException:
3425     {
3426       StgTSO *target  = tso->block_info.tso;
3427
3428       ASSERT(get_itbl(target)->type == TSO);
3429
3430       while (target->what_next == ThreadRelocated) {
3431           target = target->link;
3432           ASSERT(get_itbl(target)->type == TSO);
3433       }
3434       
3435       ASSERT(target->blocked_exceptions != NULL);
3436
3437       last = &target->blocked_exceptions;
3438       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3439            last = &t->link, t = t->link) {
3440         ASSERT(get_itbl(t)->type == TSO);
3441         if (t == tso) {
3442           *last = tso->link;
3443           goto done;
3444         }
3445       }
3446       barf("unblockThread (Exception): TSO not found");
3447     }
3448
3449 #if !defined(THREADED_RTS)
3450   case BlockedOnRead:
3451   case BlockedOnWrite:
3452 #if defined(mingw32_HOST_OS)
3453   case BlockedOnDoProc:
3454 #endif
3455     {
3456       StgTSO *prev = NULL;
3457       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3458            prev = t, t = t->link) {
3459         if (t == tso) {
3460           if (prev == NULL) {
3461             blocked_queue_hd = t->link;
3462             if (blocked_queue_tl == t) {
3463               blocked_queue_tl = END_TSO_QUEUE;
3464             }
3465           } else {
3466             prev->link = t->link;
3467             if (blocked_queue_tl == t) {
3468               blocked_queue_tl = prev;
3469             }
3470           }
3471 #if defined(mingw32_HOST_OS)
3472           /* (Cooperatively) signal that the worker thread should abort
3473            * the request.
3474            */
3475           abandonWorkRequest(tso->block_info.async_result->reqID);
3476 #endif
3477           goto done;
3478         }
3479       }
3480       barf("unblockThread (I/O): TSO not found");
3481     }
3482
3483   case BlockedOnDelay:
3484     {
3485       StgTSO *prev = NULL;
3486       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3487            prev = t, t = t->link) {
3488         if (t == tso) {
3489           if (prev == NULL) {
3490             sleeping_queue = t->link;
3491           } else {
3492             prev->link = t->link;
3493           }
3494           goto done;
3495         }
3496       }
3497       barf("unblockThread (delay): TSO not found");
3498     }
3499 #endif
3500
3501   default:
3502     barf("unblockThread");
3503   }
3504
3505  done:
3506   tso->link = END_TSO_QUEUE;
3507   tso->why_blocked = NotBlocked;
3508   tso->block_info.closure = NULL;
3509   appendToRunQueue(cap,tso);
3510 }
3511 #endif
3512
3513 /* -----------------------------------------------------------------------------
3514  * checkBlackHoles()
3515  *
3516  * Check the blackhole_queue for threads that can be woken up.  We do
3517  * this periodically: before every GC, and whenever the run queue is
3518  * empty.
3519  *
3520  * An elegant solution might be to just wake up all the blocked
3521  * threads with awakenBlockedQueue occasionally: they'll go back to
3522  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3523  * doesn't give us a way to tell whether we've actually managed to
3524  * wake up any threads, so we would be busy-waiting.
3525  *
3526  * -------------------------------------------------------------------------- */
3527
3528 static rtsBool
3529 checkBlackHoles (Capability *cap)
3530 {
3531     StgTSO **prev, *t;
3532     rtsBool any_woke_up = rtsFalse;
3533     StgHalfWord type;
3534
3535     // blackhole_queue is global:
3536     ASSERT_LOCK_HELD(&sched_mutex);
3537
3538     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3539
3540     // ASSUMES: sched_mutex
3541     prev = &blackhole_queue;
3542     t = blackhole_queue;
3543     while (t != END_TSO_QUEUE) {
3544         ASSERT(t->why_blocked == BlockedOnBlackHole);
3545         type = get_itbl(t->block_info.closure)->type;
3546         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3547             IF_DEBUG(sanity,checkTSO(t));
3548             t = unblockOne(cap, t);
3549             // urk, the threads migrate to the current capability
3550             // here, but we'd like to keep them on the original one.
3551             *prev = t;
3552             any_woke_up = rtsTrue;
3553         } else {
3554             prev = &t->link;
3555             t = t->link;
3556         }
3557     }
3558
3559     return any_woke_up;
3560 }
3561
3562 /* -----------------------------------------------------------------------------
3563  * raiseAsync()
3564  *
3565  * The following function implements the magic for raising an
3566  * asynchronous exception in an existing thread.
3567  *
3568  * We first remove the thread from any queue on which it might be
3569  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3570  *
3571  * We strip the stack down to the innermost CATCH_FRAME, building
3572  * thunks in the heap for all the active computations, so they can 
3573  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3574  * an application of the handler to the exception, and push it on
3575  * the top of the stack.
3576  * 
3577  * How exactly do we save all the active computations?  We create an
3578  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3579  * AP_STACKs pushes everything from the corresponding update frame
3580  * upwards onto the stack.  (Actually, it pushes everything up to the
3581  * next update frame plus a pointer to the next AP_STACK object.
3582  * Entering the next AP_STACK object pushes more onto the stack until we
3583  * reach the last AP_STACK object - at which point the stack should look
3584  * exactly as it did when we killed the TSO and we can continue
3585  * execution by entering the closure on top of the stack.
3586  *
3587  * We can also kill a thread entirely - this happens if either (a) the 
3588  * exception passed to raiseAsync is NULL, or (b) there's no
3589  * CATCH_FRAME on the stack.  In either case, we strip the entire
3590  * stack and replace the thread with a zombie.
3591  *
3592  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3593  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3594  * the TSO is currently blocked on or on the run queue of.
3595  *
3596  * -------------------------------------------------------------------------- */
3597  
3598 void
3599 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3600 {
3601     raiseAsync_(cap, tso, exception, rtsFalse);
3602 }
3603
3604 static void
3605 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3606             rtsBool stop_at_atomically)
3607 {
3608     StgRetInfoTable *info;
3609     StgPtr sp;
3610   
3611     // Thread already dead?
3612     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3613         return;
3614     }
3615
3616     IF_DEBUG(scheduler, 
3617              sched_belch("raising exception in thread %ld.", (long)tso->id));
3618     
3619     // Remove it from any blocking queues
3620     unblockThread(cap,tso);
3621
3622     sp = tso->sp;
3623     
3624     // The stack freezing code assumes there's a closure pointer on
3625     // the top of the stack, so we have to arrange that this is the case...
3626     //
3627     if (sp[0] == (W_)&stg_enter_info) {
3628         sp++;
3629     } else {
3630         sp--;
3631         sp[0] = (W_)&stg_dummy_ret_closure;
3632     }
3633
3634     while (1) {
3635         nat i;
3636
3637         // 1. Let the top of the stack be the "current closure"
3638         //
3639         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3640         // CATCH_FRAME.
3641         //
3642         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3643         // current closure applied to the chunk of stack up to (but not
3644         // including) the update frame.  This closure becomes the "current
3645         // closure".  Go back to step 2.
3646         //
3647         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3648         // top of the stack applied to the exception.
3649         // 
3650         // 5. If it's a STOP_FRAME, then kill the thread.
3651         // 
3652         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3653         // transaction
3654        
3655         
3656         StgPtr frame;
3657         
3658         frame = sp + 1;
3659         info = get_ret_itbl((StgClosure *)frame);
3660         
3661         while (info->i.type != UPDATE_FRAME
3662                && (info->i.type != CATCH_FRAME || exception == NULL)
3663                && info->i.type != STOP_FRAME
3664                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3665         {
3666             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3667               // IF we find an ATOMICALLY_FRAME then we abort the
3668               // current transaction and propagate the exception.  In
3669               // this case (unlike ordinary exceptions) we do not care
3670               // whether the transaction is valid or not because its
3671               // possible validity cannot have caused the exception
3672               // and will not be visible after the abort.
3673               IF_DEBUG(stm,
3674                        debugBelch("Found atomically block delivering async exception\n"));
3675               stmAbortTransaction(tso -> trec);
3676               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3677             }
3678             frame += stack_frame_sizeW((StgClosure *)frame);
3679             info = get_ret_itbl((StgClosure *)frame);
3680         }
3681         
3682         switch (info->i.type) {
3683             
3684         case ATOMICALLY_FRAME:
3685             ASSERT(stop_at_atomically);
3686             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3687             stmCondemnTransaction(tso -> trec);
3688 #ifdef REG_R1
3689             tso->sp = frame;
3690 #else
3691             // R1 is not a register: the return convention for IO in
3692             // this case puts the return value on the stack, so we
3693             // need to set up the stack to return to the atomically
3694             // frame properly...
3695             tso->sp = frame - 2;
3696             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3697             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3698 #endif
3699             tso->what_next = ThreadRunGHC;
3700             return;
3701
3702         case CATCH_FRAME:
3703             // If we find a CATCH_FRAME, and we've got an exception to raise,
3704             // then build the THUNK raise(exception), and leave it on
3705             // top of the CATCH_FRAME ready to enter.
3706             //
3707         {
3708 #ifdef PROFILING
3709             StgCatchFrame *cf = (StgCatchFrame *)frame;
3710 #endif
3711             StgThunk *raise;
3712             
3713             // we've got an exception to raise, so let's pass it to the
3714             // handler in this frame.
3715             //
3716             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3717             TICK_ALLOC_SE_THK(1,0);
3718             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3719             raise->payload[0] = exception;
3720             
3721             // throw away the stack from Sp up to the CATCH_FRAME.
3722             //
3723             sp = frame - 1;
3724             
3725             /* Ensure that async excpetions are blocked now, so we don't get
3726              * a surprise exception before we get around to executing the
3727              * handler.
3728              */
3729             if (tso->blocked_exceptions == NULL) {
3730                 tso->blocked_exceptions = END_TSO_QUEUE;
3731             }
3732             
3733             /* Put the newly-built THUNK on top of the stack, ready to execute
3734              * when the thread restarts.
3735              */
3736             sp[0] = (W_)raise;
3737             sp[-1] = (W_)&stg_enter_info;
3738             tso->sp = sp-1;
3739             tso->what_next = ThreadRunGHC;
3740             IF_DEBUG(sanity, checkTSO(tso));
3741             return;
3742         }
3743         
3744         case UPDATE_FRAME:
3745         {
3746             StgAP_STACK * ap;
3747             nat words;
3748             
3749             // First build an AP_STACK consisting of the stack chunk above the
3750             // current update frame, with the top word on the stack as the
3751             // fun field.
3752             //
3753             words = frame - sp - 1;
3754             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3755             
3756             ap->size = words;
3757             ap->fun  = (StgClosure *)sp[0];
3758             sp++;
3759             for(i=0; i < (nat)words; ++i) {
3760                 ap->payload[i] = (StgClosure *)*sp++;
3761             }
3762             
3763             SET_HDR(ap,&stg_AP_STACK_info,
3764                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3765             TICK_ALLOC_UP_THK(words+1,0);
3766             
3767             IF_DEBUG(scheduler,
3768                      debugBelch("sched: Updating ");
3769                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3770                      debugBelch(" with ");
3771                      printObj((StgClosure *)ap);
3772                 );
3773
3774             // Replace the updatee with an indirection - happily
3775             // this will also wake up any threads currently
3776             // waiting on the result.
3777             //
3778             // Warning: if we're in a loop, more than one update frame on
3779             // the stack may point to the same object.  Be careful not to
3780             // overwrite an IND_OLDGEN in this case, because we'll screw
3781             // up the mutable lists.  To be on the safe side, don't
3782             // overwrite any kind of indirection at all.  See also
3783             // threadSqueezeStack in GC.c, where we have to make a similar
3784             // check.
3785             //
3786             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3787                 // revert the black hole
3788                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3789                                (StgClosure *)ap);
3790             }
3791             sp += sizeofW(StgUpdateFrame) - 1;
3792             sp[0] = (W_)ap; // push onto stack
3793             break;
3794         }
3795         
3796         case STOP_FRAME:
3797             // We've stripped the entire stack, the thread is now dead.
3798             sp += sizeofW(StgStopFrame);
3799             tso->what_next = ThreadKilled;
3800             tso->sp = sp;
3801             return;
3802             
3803         default:
3804             barf("raiseAsync");
3805         }
3806     }
3807     barf("raiseAsync");
3808 }
3809
3810 /* -----------------------------------------------------------------------------
3811    Deleting threads
3812
3813    This is used for interruption (^C) and forking, and corresponds to
3814    raising an exception but without letting the thread catch the
3815    exception.
3816    -------------------------------------------------------------------------- */
3817
3818 static void 
3819 deleteThread (Capability *cap, StgTSO *tso)
3820 {
3821   if (tso->why_blocked != BlockedOnCCall &&
3822       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3823       raiseAsync(cap,tso,NULL);
3824   }
3825 }
3826
3827 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3828 static void 
3829 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3830 { // for forkProcess only:
3831   // delete thread without giving it a chance to catch the KillThread exception
3832
3833   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3834       return;
3835   }
3836
3837   if (tso->why_blocked != BlockedOnCCall &&
3838       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3839       unblockThread(cap,tso);
3840   }
3841
3842   tso->what_next = ThreadKilled;
3843 }
3844 #endif
3845
3846 /* -----------------------------------------------------------------------------
3847    raiseExceptionHelper
3848    
3849    This function is called by the raise# primitve, just so that we can
3850    move some of the tricky bits of raising an exception from C-- into
3851    C.  Who knows, it might be a useful re-useable thing here too.
3852    -------------------------------------------------------------------------- */
3853
3854 StgWord
3855 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3856 {
3857     Capability *cap = regTableToCapability(reg);
3858     StgThunk *raise_closure = NULL;
3859     StgPtr p, next;
3860     StgRetInfoTable *info;
3861     //
3862     // This closure represents the expression 'raise# E' where E
3863     // is the exception raise.  It is used to overwrite all the
3864     // thunks which are currently under evaluataion.
3865     //
3866
3867     //    
3868     // LDV profiling: stg_raise_info has THUNK as its closure
3869     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3870     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3871     // 1 does not cause any problem unless profiling is performed.
3872     // However, when LDV profiling goes on, we need to linearly scan
3873     // small object pool, where raise_closure is stored, so we should
3874     // use MIN_UPD_SIZE.
3875     //
3876     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3877     //                                 sizeofW(StgClosure)+1);
3878     //
3879
3880     //
3881     // Walk up the stack, looking for the catch frame.  On the way,
3882     // we update any closures pointed to from update frames with the
3883     // raise closure that we just built.
3884     //
3885     p = tso->sp;
3886     while(1) {
3887         info = get_ret_itbl((StgClosure *)p);
3888         next = p + stack_frame_sizeW((StgClosure *)p);
3889         switch (info->i.type) {
3890             
3891         case UPDATE_FRAME:
3892             // Only create raise_closure if we need to.
3893             if (raise_closure == NULL) {
3894                 raise_closure = 
3895                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3896                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3897                 raise_closure->payload[0] = exception;
3898             }
3899             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3900             p = next;
3901             continue;
3902
3903         case ATOMICALLY_FRAME:
3904             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3905             tso->sp = p;
3906             return ATOMICALLY_FRAME;
3907             
3908         case CATCH_FRAME:
3909             tso->sp = p;
3910             return CATCH_FRAME;
3911
3912         case CATCH_STM_FRAME:
3913             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3914             tso->sp = p;
3915             return CATCH_STM_FRAME;
3916             
3917         case STOP_FRAME:
3918             tso->sp = p;
3919             return STOP_FRAME;
3920
3921         case CATCH_RETRY_FRAME:
3922         default:
3923             p = next; 
3924             continue;
3925         }
3926     }
3927 }
3928
3929
3930 /* -----------------------------------------------------------------------------
3931    findRetryFrameHelper
3932
3933    This function is called by the retry# primitive.  It traverses the stack
3934    leaving tso->sp referring to the frame which should handle the retry.  
3935
3936    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3937    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3938
3939    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3940    despite the similar implementation.
3941
3942    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3943    not be created within memory transactions.
3944    -------------------------------------------------------------------------- */
3945
3946 StgWord
3947 findRetryFrameHelper (StgTSO *tso)
3948 {
3949   StgPtr           p, next;
3950   StgRetInfoTable *info;
3951
3952   p = tso -> sp;
3953   while (1) {
3954     info = get_ret_itbl((StgClosure *)p);
3955     next = p + stack_frame_sizeW((StgClosure *)p);
3956     switch (info->i.type) {
3957       
3958     case ATOMICALLY_FRAME:
3959       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3960       tso->sp = p;
3961       return ATOMICALLY_FRAME;
3962       
3963     case CATCH_RETRY_FRAME:
3964       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3965       tso->sp = p;
3966       return CATCH_RETRY_FRAME;
3967       
3968     case CATCH_STM_FRAME:
3969     default:
3970       ASSERT(info->i.type != CATCH_FRAME);
3971       ASSERT(info->i.type != STOP_FRAME);
3972       p = next; 
3973       continue;
3974     }
3975   }
3976 }
3977
3978 /* -----------------------------------------------------------------------------
3979    resurrectThreads is called after garbage collection on the list of
3980    threads found to be garbage.  Each of these threads will be woken
3981    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3982    on an MVar, or NonTermination if the thread was blocked on a Black
3983    Hole.
3984
3985    Locks: assumes we hold *all* the capabilities.
3986    -------------------------------------------------------------------------- */
3987
3988 void
3989 resurrectThreads (StgTSO *threads)
3990 {
3991     StgTSO *tso, *next;
3992     Capability *cap;
3993
3994     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3995         next = tso->global_link;
3996         tso->global_link = all_threads;
3997         all_threads = tso;
3998         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3999         
4000         // Wake up the thread on the Capability it was last on for a
4001         // bound thread, or last_free_capability otherwise.
4002         if (tso->bound) {
4003             cap = tso->bound->cap;
4004         } else {
4005             cap = last_free_capability;
4006         }
4007         
4008         switch (tso->why_blocked) {
4009         case BlockedOnMVar:
4010         case BlockedOnException:
4011             /* Called by GC - sched_mutex lock is currently held. */
4012             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4013             break;
4014         case BlockedOnBlackHole:
4015             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4016             break;
4017         case BlockedOnSTM:
4018             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4019             break;
4020         case NotBlocked:
4021             /* This might happen if the thread was blocked on a black hole
4022              * belonging to a thread that we've just woken up (raiseAsync
4023              * can wake up threads, remember...).
4024              */
4025             continue;
4026         default:
4027             barf("resurrectThreads: thread blocked in a strange way");
4028         }
4029     }
4030 }
4031
4032 /* ----------------------------------------------------------------------------
4033  * Debugging: why is a thread blocked
4034  * [Also provides useful information when debugging threaded programs
4035  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4036    ------------------------------------------------------------------------- */
4037
4038 #if DEBUG
4039 static void
4040 printThreadBlockage(StgTSO *tso)
4041 {
4042   switch (tso->why_blocked) {
4043   case BlockedOnRead:
4044     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4045     break;
4046   case BlockedOnWrite:
4047     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4048     break;
4049 #if defined(mingw32_HOST_OS)
4050     case BlockedOnDoProc:
4051     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4052     break;
4053 #endif
4054   case BlockedOnDelay:
4055     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4056     break;
4057   case BlockedOnMVar:
4058     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4059     break;
4060   case BlockedOnException:
4061     debugBelch("is blocked on delivering an exception to thread %d",
4062             tso->block_info.tso->id);
4063     break;
4064   case BlockedOnBlackHole:
4065     debugBelch("is blocked on a black hole");
4066     break;
4067   case NotBlocked:
4068     debugBelch("is not blocked");
4069     break;
4070 #if defined(PARALLEL_HASKELL)
4071   case BlockedOnGA:
4072     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4073             tso->block_info.closure, info_type(tso->block_info.closure));
4074     break;
4075   case BlockedOnGA_NoSend:
4076     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4077             tso->block_info.closure, info_type(tso->block_info.closure));
4078     break;
4079 #endif
4080   case BlockedOnCCall:
4081     debugBelch("is blocked on an external call");
4082     break;
4083   case BlockedOnCCall_NoUnblockExc:
4084     debugBelch("is blocked on an external call (exceptions were already blocked)");
4085     break;
4086   case BlockedOnSTM:
4087     debugBelch("is blocked on an STM operation");
4088     break;
4089   default:
4090     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4091          tso->why_blocked, tso->id, tso);
4092   }
4093 }
4094
4095 void
4096 printThreadStatus(StgTSO *t)
4097 {
4098     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4099     {
4100       void *label = lookupThreadLabel(t->id);
4101       if (label) debugBelch("[\"%s\"] ",(char *)label);
4102     }
4103     if (t->what_next == ThreadRelocated) {
4104         debugBelch("has been relocated...\n");
4105     } else {
4106         switch (t->what_next) {
4107         case ThreadKilled:
4108             debugBelch("has been killed");
4109             break;
4110         case ThreadComplete:
4111             debugBelch("has completed");
4112             break;
4113         default:
4114             printThreadBlockage(t);
4115         }
4116         debugBelch("\n");
4117     }
4118 }
4119
4120 void
4121 printAllThreads(void)
4122 {
4123   StgTSO *t, *next;
4124   nat i;
4125   Capability *cap;
4126
4127 # if defined(GRAN)
4128   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4129   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4130                        time_string, rtsFalse/*no commas!*/);
4131
4132   debugBelch("all threads at [%s]:\n", time_string);
4133 # elif defined(PARALLEL_HASKELL)
4134   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4135   ullong_format_string(CURRENT_TIME,
4136                        time_string, rtsFalse/*no commas!*/);
4137
4138   debugBelch("all threads at [%s]:\n", time_string);
4139 # else
4140   debugBelch("all threads:\n");
4141 # endif
4142
4143   for (i = 0; i < n_capabilities; i++) {
4144       cap = &capabilities[i];
4145       debugBelch("threads on capability %d:\n", cap->no);
4146       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4147           printThreadStatus(t);
4148       }
4149   }
4150
4151   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4152       if (t->why_blocked != NotBlocked) {
4153           printThreadStatus(t);
4154       }
4155       if (t->what_next == ThreadRelocated) {
4156           next = t->link;
4157       } else {
4158           next = t->global_link;
4159       }
4160   }
4161 }
4162
4163 // useful from gdb
4164 void 
4165 printThreadQueue(StgTSO *t)
4166 {
4167     nat i = 0;
4168     for (; t != END_TSO_QUEUE; t = t->link) {
4169         printThreadStatus(t);
4170         i++;
4171     }
4172     debugBelch("%d threads on queue\n", i);
4173 }
4174
4175 /* 
4176    Print a whole blocking queue attached to node (debugging only).
4177 */
4178 # if defined(PARALLEL_HASKELL)
4179 void 
4180 print_bq (StgClosure *node)
4181 {
4182   StgBlockingQueueElement *bqe;
4183   StgTSO *tso;
4184   rtsBool end;
4185
4186   debugBelch("## BQ of closure %p (%s): ",
4187           node, info_type(node));
4188
4189   /* should cover all closures that may have a blocking queue */
4190   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4191          get_itbl(node)->type == FETCH_ME_BQ ||
4192          get_itbl(node)->type == RBH ||
4193          get_itbl(node)->type == MVAR);
4194     
4195   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4196
4197   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4198 }
4199
4200 /* 
4201    Print a whole blocking queue starting with the element bqe.
4202 */
4203 void 
4204 print_bqe (StgBlockingQueueElement *bqe)
4205 {
4206   rtsBool end;
4207
4208   /* 
4209      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4210   */
4211   for (end = (bqe==END_BQ_QUEUE);
4212        !end; // iterate until bqe points to a CONSTR
4213        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4214        bqe = end ? END_BQ_QUEUE : bqe->link) {
4215     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4216     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4217     /* types of closures that may appear in a blocking queue */
4218     ASSERT(get_itbl(bqe)->type == TSO ||           
4219            get_itbl(bqe)->type == BLOCKED_FETCH || 
4220            get_itbl(bqe)->type == CONSTR); 
4221     /* only BQs of an RBH end with an RBH_Save closure */
4222     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4223
4224     switch (get_itbl(bqe)->type) {
4225     case TSO:
4226       debugBelch(" TSO %u (%x),",
4227               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4228       break;
4229     case BLOCKED_FETCH:
4230       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4231               ((StgBlockedFetch *)bqe)->node, 
4232               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4233               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4234               ((StgBlockedFetch *)bqe)->ga.weight);
4235       break;
4236     case CONSTR:
4237       debugBelch(" %s (IP %p),",
4238               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4239                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4240                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4241                "RBH_Save_?"), get_itbl(bqe));
4242       break;
4243     default:
4244       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4245            info_type((StgClosure *)bqe)); // , node, info_type(node));
4246       break;
4247     }
4248   } /* for */
4249   debugBelch("\n");
4250 }
4251 # elif defined(GRAN)
4252 void 
4253 print_bq (StgClosure *node)
4254 {
4255   StgBlockingQueueElement *bqe;
4256   PEs node_loc, tso_loc;
4257   rtsBool end;
4258
4259   /* should cover all closures that may have a blocking queue */
4260   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4261          get_itbl(node)->type == FETCH_ME_BQ ||
4262          get_itbl(node)->type == RBH);
4263     
4264   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4265   node_loc = where_is(node);
4266
4267   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4268           node, info_type(node), node_loc);
4269
4270   /* 
4271      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4272   */
4273   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4274        !end; // iterate until bqe points to a CONSTR
4275        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4276     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4277     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4278     /* types of closures that may appear in a blocking queue */
4279     ASSERT(get_itbl(bqe)->type == TSO ||           
4280            get_itbl(bqe)->type == CONSTR); 
4281     /* only BQs of an RBH end with an RBH_Save closure */
4282     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4283
4284     tso_loc = where_is((StgClosure *)bqe);
4285     switch (get_itbl(bqe)->type) {
4286     case TSO:
4287       debugBelch(" TSO %d (%p) on [PE %d],",
4288               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4289       break;
4290     case CONSTR:
4291       debugBelch(" %s (IP %p),",
4292               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4293                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4294                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4295                "RBH_Save_?"), get_itbl(bqe));
4296       break;
4297     default:
4298       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4299            info_type((StgClosure *)bqe), node, info_type(node));
4300       break;
4301     }
4302   } /* for */
4303   debugBelch("\n");
4304 }
4305 # endif
4306
4307 #if defined(PARALLEL_HASKELL)
4308 static nat
4309 run_queue_len(void)
4310 {
4311     nat i;
4312     StgTSO *tso;
4313     
4314     for (i=0, tso=run_queue_hd; 
4315          tso != END_TSO_QUEUE;
4316          i++, tso=tso->link) {
4317         /* nothing */
4318     }
4319         
4320     return i;
4321 }
4322 #endif
4323
4324 void
4325 sched_belch(char *s, ...)
4326 {
4327     va_list ap;
4328     va_start(ap,s);
4329 #ifdef THREADED_RTS
4330     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4331 #elif defined(PARALLEL_HASKELL)
4332     debugBelch("== ");
4333 #else
4334     debugBelch("sched: ");
4335 #endif
4336     vdebugBelch(s, ap);
4337     debugBelch("\n");
4338     va_end(ap);
4339 }
4340
4341 #endif /* DEBUG */