[project @ 2005-11-04 15:33:36 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 #ifdef THREADED_RTS
76 #define USED_WHEN_THREADED_RTS
77 #define USED_WHEN_NON_THREADED_RTS STG_UNUSED
78 #else
79 #define USED_WHEN_THREADED_RTS     STG_UNUSED
80 #define USED_WHEN_NON_THREADED_RTS
81 #endif
82
83 #ifdef SMP
84 #define USED_WHEN_SMP
85 #else
86 #define USED_WHEN_SMP STG_UNUSED
87 #endif
88
89 /* -----------------------------------------------------------------------------
90  * Global variables
91  * -------------------------------------------------------------------------- */
92
93 #if defined(GRAN)
94
95 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
96 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
97
98 /* 
99    In GranSim we have a runnable and a blocked queue for each processor.
100    In order to minimise code changes new arrays run_queue_hds/tls
101    are created. run_queue_hd is then a short cut (macro) for
102    run_queue_hds[CurrentProc] (see GranSim.h).
103    -- HWL
104 */
105 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
106 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
107 StgTSO *ccalling_threadss[MAX_PROC];
108 /* We use the same global list of threads (all_threads) in GranSim as in
109    the std RTS (i.e. we are cheating). However, we don't use this list in
110    the GranSim specific code at the moment (so we are only potentially
111    cheating).  */
112
113 #else /* !GRAN */
114
115 #if !defined(THREADED_RTS)
116 // Blocked/sleeping thrads
117 StgTSO *blocked_queue_hd = NULL;
118 StgTSO *blocked_queue_tl = NULL;
119 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
120 #endif
121
122 /* Threads blocked on blackholes.
123  * LOCK: sched_mutex+capability, or all capabilities
124  */
125 StgTSO *blackhole_queue = NULL;
126 #endif
127
128 /* The blackhole_queue should be checked for threads to wake up.  See
129  * Schedule.h for more thorough comment.
130  * LOCK: none (doesn't matter if we miss an update)
131  */
132 rtsBool blackholes_need_checking = rtsFalse;
133
134 /* Linked list of all threads.
135  * Used for detecting garbage collected threads.
136  * LOCK: sched_mutex+capability, or all capabilities
137  */
138 StgTSO *all_threads = NULL;
139
140 /* flag set by signal handler to precipitate a context switch
141  * LOCK: none (just an advisory flag)
142  */
143 int context_switch = 0;
144
145 /* flag that tracks whether we have done any execution in this time slice.
146  * LOCK: currently none, perhaps we should lock (but needs to be
147  * updated in the fast path of the scheduler).
148  */
149 nat recent_activity = ACTIVITY_YES;
150
151 /* if this flag is set as well, give up execution
152  * LOCK: none (changes once, from false->true)
153  */
154 rtsBool interrupted = rtsFalse;
155
156 /* Next thread ID to allocate.
157  * LOCK: sched_mutex
158  */
159 static StgThreadID next_thread_id = 1;
160
161 /* The smallest stack size that makes any sense is:
162  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
163  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
164  *  + 1                       (the closure to enter)
165  *  + 1                       (stg_ap_v_ret)
166  *  + 1                       (spare slot req'd by stg_ap_v_ret)
167  *
168  * A thread with this stack will bomb immediately with a stack
169  * overflow, which will increase its stack size.  
170  */
171 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
172
173 #if defined(GRAN)
174 StgTSO *CurrentTSO;
175 #endif
176
177 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
178  *  exists - earlier gccs apparently didn't.
179  *  -= chak
180  */
181 StgTSO dummy_tso;
182
183 /*
184  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
185  * in an MT setting, needed to signal that a worker thread shouldn't hang around
186  * in the scheduler when it is out of work.
187  */
188 rtsBool shutting_down_scheduler = rtsFalse;
189
190 /*
191  * This mutex protects most of the global scheduler data in
192  * the THREADED_RTS and (inc. SMP) runtime.
193  */
194 #if defined(THREADED_RTS)
195 Mutex sched_mutex;
196 #endif
197
198 #if defined(PARALLEL_HASKELL)
199 StgTSO *LastTSO;
200 rtsTime TimeOfLastYield;
201 rtsBool emitSchedule = rtsTrue;
202 #endif
203
204 /* -----------------------------------------------------------------------------
205  * static function prototypes
206  * -------------------------------------------------------------------------- */
207
208 static Capability *schedule (Capability *initialCapability, Task *task);
209
210 //
211 // These function all encapsulate parts of the scheduler loop, and are
212 // abstracted only to make the structure and control flow of the
213 // scheduler clearer.
214 //
215 static void schedulePreLoop (void);
216 #if defined(SMP)
217 static void schedulePushWork(Capability *cap, Task *task);
218 #endif
219 static void scheduleStartSignalHandlers (Capability *cap);
220 static void scheduleCheckBlockedThreads (Capability *cap);
221 static void scheduleCheckBlackHoles (Capability *cap);
222 static void scheduleDetectDeadlock (Capability *cap, Task *task);
223 #if defined(GRAN)
224 static StgTSO *scheduleProcessEvent(rtsEvent *event);
225 #endif
226 #if defined(PARALLEL_HASKELL)
227 static StgTSO *scheduleSendPendingMessages(void);
228 static void scheduleActivateSpark(void);
229 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
230 #endif
231 #if defined(PAR) || defined(GRAN)
232 static void scheduleGranParReport(void);
233 #endif
234 static void schedulePostRunThread(void);
235 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
236 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
237                                          StgTSO *t);
238 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
239                                     nat prev_what_next );
240 static void scheduleHandleThreadBlocked( StgTSO *t );
241 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
242                                              StgTSO *t );
243 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
244 static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major);
245
246 static void unblockThread(Capability *cap, StgTSO *tso);
247 static rtsBool checkBlackHoles(Capability *cap);
248 static void AllRoots(evac_fn evac);
249
250 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
251
252 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
253                         rtsBool stop_at_atomically);
254
255 static void deleteThread (Capability *cap, StgTSO *tso);
256 static void deleteRunQueue (Capability *cap);
257
258 #ifdef DEBUG
259 static void printThreadBlockage(StgTSO *tso);
260 static void printThreadStatus(StgTSO *tso);
261 void printThreadQueue(StgTSO *tso);
262 #endif
263
264 #if defined(PARALLEL_HASKELL)
265 StgTSO * createSparkThread(rtsSpark spark);
266 StgTSO * activateSpark (rtsSpark spark);  
267 #endif
268
269 #ifdef DEBUG
270 static char *whatNext_strs[] = {
271   "(unknown)",
272   "ThreadRunGHC",
273   "ThreadInterpret",
274   "ThreadKilled",
275   "ThreadRelocated",
276   "ThreadComplete"
277 };
278 #endif
279
280 /* -----------------------------------------------------------------------------
281  * Putting a thread on the run queue: different scheduling policies
282  * -------------------------------------------------------------------------- */
283
284 STATIC_INLINE void
285 addToRunQueue( Capability *cap, StgTSO *t )
286 {
287 #if defined(PARALLEL_HASKELL)
288     if (RtsFlags.ParFlags.doFairScheduling) { 
289         // this does round-robin scheduling; good for concurrency
290         appendToRunQueue(cap,t);
291     } else {
292         // this does unfair scheduling; good for parallelism
293         pushOnRunQueue(cap,t);
294     }
295 #else
296     // this does round-robin scheduling; good for concurrency
297     appendToRunQueue(cap,t);
298 #endif
299 }
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    GRAN version:
314      In a GranSim setup this loop iterates over the global event queue.
315      This revolves around the global event queue, which determines what 
316      to do next. Therefore, it's more complicated than either the 
317      concurrent or the parallel (GUM) setup.
318
319    GUM version:
320      GUM iterates over incoming messages.
321      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
322      and sends out a fish whenever it has nothing to do; in-between
323      doing the actual reductions (shared code below) it processes the
324      incoming messages and deals with delayed operations 
325      (see PendingFetches).
326      This is not the ugliest code you could imagine, but it's bloody close.
327
328    ------------------------------------------------------------------------ */
329
330 static Capability *
331 schedule (Capability *initialCapability, Task *task)
332 {
333   StgTSO *t;
334   Capability *cap;
335   StgThreadReturnCode ret;
336 #if defined(GRAN)
337   rtsEvent *event;
338 #elif defined(PARALLEL_HASKELL)
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   nat prev_what_next;
347   rtsBool ready_to_gc;
348 #if defined(THREADED_RTS)
349   rtsBool first = rtsTrue;
350 #endif
351   
352   cap = initialCapability;
353
354   // Pre-condition: this task owns initialCapability.
355   // The sched_mutex is *NOT* held
356   // NB. on return, we still hold a capability.
357
358   IF_DEBUG(scheduler,
359            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
360                        task, initialCapability);
361       );
362
363   schedulePreLoop();
364
365   // -----------------------------------------------------------
366   // Scheduler loop starts here:
367
368 #if defined(PARALLEL_HASKELL)
369 #define TERMINATION_CONDITION        (!receivedFinish)
370 #elif defined(GRAN)
371 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
372 #else
373 #define TERMINATION_CONDITION        rtsTrue
374 #endif
375
376   while (TERMINATION_CONDITION) {
377
378 #if defined(GRAN)
379       /* Choose the processor with the next event */
380       CurrentProc = event->proc;
381       CurrentTSO = event->tso;
382 #endif
383
384 #if defined(THREADED_RTS)
385       if (first) {
386           // don't yield the first time, we want a chance to run this
387           // thread for a bit, even if there are others banging at the
388           // door.
389           first = rtsFalse;
390           ASSERT_CAPABILITY_INVARIANTS(cap,task);
391       } else {
392           // Yield the capability to higher-priority tasks if necessary.
393           yieldCapability(&cap, task);
394       }
395 #endif
396       
397 #ifdef SMP
398       schedulePushWork(cap,task);
399 #endif          
400
401     // Check whether we have re-entered the RTS from Haskell without
402     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
403     // call).
404     if (cap->in_haskell) {
405           errorBelch("schedule: re-entered unsafely.\n"
406                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
407           stg_exit(EXIT_FAILURE);
408     }
409
410     //
411     // Test for interruption.  If interrupted==rtsTrue, then either
412     // we received a keyboard interrupt (^C), or the scheduler is
413     // trying to shut down all the tasks (shutting_down_scheduler) in
414     // the threaded RTS.
415     //
416     if (interrupted) {
417         deleteRunQueue(cap);
418         if (shutting_down_scheduler) {
419             IF_DEBUG(scheduler, sched_belch("shutting down"));
420             // If we are a worker, just exit.  If we're a bound thread
421             // then we will exit below when we've removed our TSO from
422             // the run queue.
423             if (task->tso == NULL && emptyRunQueue(cap)) {
424                 return cap;
425             }
426         } else {
427             IF_DEBUG(scheduler, sched_belch("interrupted"));
428         }
429     }
430
431 #if defined(not_yet) && defined(SMP)
432     //
433     // Top up the run queue from our spark pool.  We try to make the
434     // number of threads in the run queue equal to the number of
435     // free capabilities.
436     //
437     {
438         StgClosure *spark;
439         if (emptyRunQueue()) {
440             spark = findSpark(rtsFalse);
441             if (spark == NULL) {
442                 break; /* no more sparks in the pool */
443             } else {
444                 createSparkThread(spark);         
445                 IF_DEBUG(scheduler,
446                          sched_belch("==^^ turning spark of closure %p into a thread",
447                                      (StgClosure *)spark));
448             }
449         }
450     }
451 #endif // SMP
452
453     scheduleStartSignalHandlers(cap);
454
455     // Only check the black holes here if we've nothing else to do.
456     // During normal execution, the black hole list only gets checked
457     // at GC time, to avoid repeatedly traversing this possibly long
458     // list each time around the scheduler.
459     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
460
461     scheduleCheckBlockedThreads(cap);
462
463     scheduleDetectDeadlock(cap,task);
464
465     // Normally, the only way we can get here with no threads to
466     // run is if a keyboard interrupt received during 
467     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
468     // Additionally, it is not fatal for the
469     // threaded RTS to reach here with no threads to run.
470     //
471     // win32: might be here due to awaitEvent() being abandoned
472     // as a result of a console event having been delivered.
473     if ( emptyRunQueue(cap) ) {
474 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
475         ASSERT(interrupted);
476 #endif
477         continue; // nothing to do
478     }
479
480 #if defined(PARALLEL_HASKELL)
481     scheduleSendPendingMessages();
482     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
483         continue;
484
485 #if defined(SPARKS)
486     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
487 #endif
488
489     /* If we still have no work we need to send a FISH to get a spark
490        from another PE */
491     if (emptyRunQueue(cap)) {
492         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
493         ASSERT(rtsFalse); // should not happen at the moment
494     }
495     // from here: non-empty run queue.
496     //  TODO: merge above case with this, only one call processMessages() !
497     if (PacketsWaiting()) {  /* process incoming messages, if
498                                 any pending...  only in else
499                                 because getRemoteWork waits for
500                                 messages as well */
501         receivedFinish = processMessages();
502     }
503 #endif
504
505 #if defined(GRAN)
506     scheduleProcessEvent(event);
507 #endif
508
509     // 
510     // Get a thread to run
511     //
512     t = popRunQueue(cap);
513
514 #if defined(GRAN) || defined(PAR)
515     scheduleGranParReport(); // some kind of debuging output
516 #else
517     // Sanity check the thread we're about to run.  This can be
518     // expensive if there is lots of thread switching going on...
519     IF_DEBUG(sanity,checkTSO(t));
520 #endif
521
522 #if defined(THREADED_RTS)
523     // Check whether we can run this thread in the current task.
524     // If not, we have to pass our capability to the right task.
525     {
526         Task *bound = t->bound;
527       
528         if (bound) {
529             if (bound == task) {
530                 IF_DEBUG(scheduler,
531                          sched_belch("### Running thread %d in bound thread",
532                                      t->id));
533                 // yes, the Haskell thread is bound to the current native thread
534             } else {
535                 IF_DEBUG(scheduler,
536                          sched_belch("### thread %d bound to another OS thread",
537                                      t->id));
538                 // no, bound to a different Haskell thread: pass to that thread
539                 pushOnRunQueue(cap,t);
540                 continue;
541             }
542         } else {
543             // The thread we want to run is unbound.
544             if (task->tso) { 
545                 IF_DEBUG(scheduler,
546                          sched_belch("### this OS thread cannot run thread %d", t->id));
547                 // no, the current native thread is bound to a different
548                 // Haskell thread, so pass it to any worker thread
549                 pushOnRunQueue(cap,t);
550                 continue; 
551             }
552         }
553     }
554 #endif
555
556     cap->r.rCurrentTSO = t;
557     
558     /* context switches are initiated by the timer signal, unless
559      * the user specified "context switch as often as possible", with
560      * +RTS -C0
561      */
562     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
563         && !emptyThreadQueues(cap)) {
564         context_switch = 1;
565     }
566          
567 run_thread:
568
569     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
570                               (long)t->id, whatNext_strs[t->what_next]));
571
572 #if defined(PROFILING)
573     startHeapProfTimer();
574 #endif
575
576     // ----------------------------------------------------------------------
577     // Run the current thread 
578
579     prev_what_next = t->what_next;
580
581     errno = t->saved_errno;
582     cap->in_haskell = rtsTrue;
583
584     recent_activity = ACTIVITY_YES;
585
586     switch (prev_what_next) {
587         
588     case ThreadKilled:
589     case ThreadComplete:
590         /* Thread already finished, return to scheduler. */
591         ret = ThreadFinished;
592         break;
593         
594     case ThreadRunGHC:
595     {
596         StgRegTable *r;
597         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
598         cap = regTableToCapability(r);
599         ret = r->rRet;
600         break;
601     }
602     
603     case ThreadInterpret:
604         cap = interpretBCO(cap);
605         ret = cap->r.rRet;
606         break;
607         
608     default:
609         barf("schedule: invalid what_next field");
610     }
611
612     cap->in_haskell = rtsFalse;
613
614     // The TSO might have moved, eg. if it re-entered the RTS and a GC
615     // happened.  So find the new location:
616     t = cap->r.rCurrentTSO;
617
618 #ifdef SMP
619     // If ret is ThreadBlocked, and this Task is bound to the TSO that
620     // blocked, we are in limbo - the TSO is now owned by whatever it
621     // is blocked on, and may in fact already have been woken up,
622     // perhaps even on a different Capability.  It may be the case
623     // that task->cap != cap.  We better yield this Capability
624     // immediately and return to normaility.
625     if (ret == ThreadBlocked) {
626         IF_DEBUG(scheduler,
627                  debugBelch("--<< thread %d (%s) stopped: blocked\n",
628                             t->id, whatNext_strs[t->what_next]));
629         continue;
630     }
631 #endif
632
633     ASSERT_CAPABILITY_INVARIANTS(cap,task);
634
635     // And save the current errno in this thread.
636     t->saved_errno = errno;
637
638     // ----------------------------------------------------------------------
639     
640     // Costs for the scheduler are assigned to CCS_SYSTEM
641 #if defined(PROFILING)
642     stopHeapProfTimer();
643     CCCS = CCS_SYSTEM;
644 #endif
645     
646     // We have run some Haskell code: there might be blackhole-blocked
647     // threads to wake up now.
648     // Lock-free test here should be ok, we're just setting a flag.
649     if ( blackhole_queue != END_TSO_QUEUE ) {
650         blackholes_need_checking = rtsTrue;
651     }
652     
653 #if defined(THREADED_RTS)
654     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
655 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
656     IF_DEBUG(scheduler,debugBelch("sched: "););
657 #endif
658     
659     schedulePostRunThread();
660
661     ready_to_gc = rtsFalse;
662
663     switch (ret) {
664     case HeapOverflow:
665         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
666         break;
667
668     case StackOverflow:
669         scheduleHandleStackOverflow(cap,task,t);
670         break;
671
672     case ThreadYielding:
673         if (scheduleHandleYield(cap, t, prev_what_next)) {
674             // shortcut for switching between compiler/interpreter:
675             goto run_thread; 
676         }
677         break;
678
679     case ThreadBlocked:
680         scheduleHandleThreadBlocked(t);
681         break;
682
683     case ThreadFinished:
684         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
685         ASSERT_CAPABILITY_INVARIANTS(cap,task);
686         break;
687
688     default:
689       barf("schedule: invalid thread return code %d", (int)ret);
690     }
691
692     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
693     if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); }
694   } /* end of while() */
695
696   IF_PAR_DEBUG(verbose,
697                debugBelch("== Leaving schedule() after having received Finish\n"));
698 }
699
700 /* ----------------------------------------------------------------------------
701  * Setting up the scheduler loop
702  * ------------------------------------------------------------------------- */
703
704 static void
705 schedulePreLoop(void)
706 {
707 #if defined(GRAN) 
708     /* set up first event to get things going */
709     /* ToDo: assign costs for system setup and init MainTSO ! */
710     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
711               ContinueThread, 
712               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
713     
714     IF_DEBUG(gran,
715              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
716                         CurrentTSO);
717              G_TSO(CurrentTSO, 5));
718     
719     if (RtsFlags.GranFlags.Light) {
720         /* Save current time; GranSim Light only */
721         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
722     }      
723 #endif
724 }
725
726 /* -----------------------------------------------------------------------------
727  * schedulePushWork()
728  *
729  * Push work to other Capabilities if we have some.
730  * -------------------------------------------------------------------------- */
731
732 #ifdef SMP
733 static void
734 schedulePushWork(Capability *cap USED_WHEN_SMP, 
735                  Task *task      USED_WHEN_SMP)
736 {
737     Capability *free_caps[n_capabilities], *cap0;
738     nat i, n_free_caps;
739
740     // Check whether we have more threads on our run queue that we
741     // could hand to another Capability.
742     if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
743         return;
744     }
745
746     // First grab as many free Capabilities as we can.
747     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
748         cap0 = &capabilities[i];
749         if (cap != cap0 && tryGrabCapability(cap0,task)) {
750             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
751                 // it already has some work, we just grabbed it at 
752                 // the wrong moment.  Or maybe it's deadlocked!
753                 releaseCapability(cap0);
754             } else {
755                 free_caps[n_free_caps++] = cap0;
756             }
757         }
758     }
759
760     // we now have n_free_caps free capabilities stashed in
761     // free_caps[].  Share our run queue equally with them.  This is
762     // probably the simplest thing we could do; improvements we might
763     // want to do include:
764     //
765     //   - giving high priority to moving relatively new threads, on 
766     //     the gournds that they haven't had time to build up a
767     //     working set in the cache on this CPU/Capability.
768     //
769     //   - giving low priority to moving long-lived threads
770
771     if (n_free_caps > 0) {
772         StgTSO *prev, *t, *next;
773         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
774
775         prev = cap->run_queue_hd;
776         t = prev->link;
777         prev->link = END_TSO_QUEUE;
778         i = 0;
779         for (; t != END_TSO_QUEUE; t = next) {
780             next = t->link;
781             t->link = END_TSO_QUEUE;
782             if (t->what_next == ThreadRelocated
783                 || t->bound == task) { // don't move my bound thread
784                 prev->link = t;
785                 prev = t;
786             } else if (i == n_free_caps) {
787                 i = 0;
788                 // keep one for us
789                 prev->link = t;
790                 prev = t;
791             } else {
792                 appendToRunQueue(free_caps[i],t);
793                 if (t->bound) { t->bound->cap = free_caps[i]; }
794                 i++;
795             }
796         }
797         cap->run_queue_tl = prev;
798
799         // release the capabilities
800         for (i = 0; i < n_free_caps; i++) {
801             task->cap = free_caps[i];
802             releaseCapability(free_caps[i]);
803         }
804     }
805     task->cap = cap; // reset to point to our Capability.
806 }
807 #endif
808
809 /* ----------------------------------------------------------------------------
810  * Start any pending signal handlers
811  * ------------------------------------------------------------------------- */
812
813 static void
814 scheduleStartSignalHandlers(Capability *cap)
815 {
816 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
817     if (signals_pending()) { // safe outside the lock
818         startSignalHandlers(cap);
819     }
820 #endif
821 }
822
823 /* ----------------------------------------------------------------------------
824  * Check for blocked threads that can be woken up.
825  * ------------------------------------------------------------------------- */
826
827 static void
828 scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS)
829 {
830 #if !defined(THREADED_RTS)
831     //
832     // Check whether any waiting threads need to be woken up.  If the
833     // run queue is empty, and there are no other tasks running, we
834     // can wait indefinitely for something to happen.
835     //
836     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
837     {
838         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
839     }
840 #endif
841 }
842
843
844 /* ----------------------------------------------------------------------------
845  * Check for threads blocked on BLACKHOLEs that can be woken up
846  * ------------------------------------------------------------------------- */
847 static void
848 scheduleCheckBlackHoles (Capability *cap)
849 {
850     if ( blackholes_need_checking ) // check without the lock first
851     {
852         ACQUIRE_LOCK(&sched_mutex);
853         if ( blackholes_need_checking ) {
854             checkBlackHoles(cap);
855             blackholes_need_checking = rtsFalse;
856         }
857         RELEASE_LOCK(&sched_mutex);
858     }
859 }
860
861 /* ----------------------------------------------------------------------------
862  * Detect deadlock conditions and attempt to resolve them.
863  * ------------------------------------------------------------------------- */
864
865 static void
866 scheduleDetectDeadlock (Capability *cap, Task *task)
867 {
868
869 #if defined(PARALLEL_HASKELL)
870     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
871     return;
872 #endif
873
874     /* 
875      * Detect deadlock: when we have no threads to run, there are no
876      * threads blocked, waiting for I/O, or sleeping, and all the
877      * other tasks are waiting for work, we must have a deadlock of
878      * some description.
879      */
880     if ( emptyThreadQueues(cap) )
881     {
882 #if defined(THREADED_RTS)
883         /* 
884          * In the threaded RTS, we only check for deadlock if there
885          * has been no activity in a complete timeslice.  This means
886          * we won't eagerly start a full GC just because we don't have
887          * any threads to run currently.
888          */
889         if (recent_activity != ACTIVITY_INACTIVE) return;
890 #endif
891
892         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
893
894         // Garbage collection can release some new threads due to
895         // either (a) finalizers or (b) threads resurrected because
896         // they are unreachable and will therefore be sent an
897         // exception.  Any threads thus released will be immediately
898         // runnable.
899         scheduleDoGC( cap, task, rtsTrue/*force  major GC*/ );
900         recent_activity = ACTIVITY_DONE_GC;
901         
902         if ( !emptyRunQueue(cap) ) return;
903
904 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
905         /* If we have user-installed signal handlers, then wait
906          * for signals to arrive rather then bombing out with a
907          * deadlock.
908          */
909         if ( anyUserHandlers() ) {
910             IF_DEBUG(scheduler, 
911                      sched_belch("still deadlocked, waiting for signals..."));
912
913             awaitUserSignals();
914
915             if (signals_pending()) {
916                 startSignalHandlers(cap);
917             }
918
919             // either we have threads to run, or we were interrupted:
920             ASSERT(!emptyRunQueue(cap) || interrupted);
921         }
922 #endif
923
924 #if !defined(THREADED_RTS)
925         /* Probably a real deadlock.  Send the current main thread the
926          * Deadlock exception.
927          */
928         if (task->tso) {
929             switch (task->tso->why_blocked) {
930             case BlockedOnSTM:
931             case BlockedOnBlackHole:
932             case BlockedOnException:
933             case BlockedOnMVar:
934                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
935                 return;
936             default:
937                 barf("deadlock: main thread blocked in a strange way");
938             }
939         }
940         return;
941 #endif
942     }
943 }
944
945 /* ----------------------------------------------------------------------------
946  * Process an event (GRAN only)
947  * ------------------------------------------------------------------------- */
948
949 #if defined(GRAN)
950 static StgTSO *
951 scheduleProcessEvent(rtsEvent *event)
952 {
953     StgTSO *t;
954
955     if (RtsFlags.GranFlags.Light)
956       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
957
958     /* adjust time based on time-stamp */
959     if (event->time > CurrentTime[CurrentProc] &&
960         event->evttype != ContinueThread)
961       CurrentTime[CurrentProc] = event->time;
962     
963     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
964     if (!RtsFlags.GranFlags.Light)
965       handleIdlePEs();
966
967     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
968
969     /* main event dispatcher in GranSim */
970     switch (event->evttype) {
971       /* Should just be continuing execution */
972     case ContinueThread:
973       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
974       /* ToDo: check assertion
975       ASSERT(run_queue_hd != (StgTSO*)NULL &&
976              run_queue_hd != END_TSO_QUEUE);
977       */
978       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
979       if (!RtsFlags.GranFlags.DoAsyncFetch &&
980           procStatus[CurrentProc]==Fetching) {
981         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
982               CurrentTSO->id, CurrentTSO, CurrentProc);
983         goto next_thread;
984       } 
985       /* Ignore ContinueThreads for completed threads */
986       if (CurrentTSO->what_next == ThreadComplete) {
987         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
988               CurrentTSO->id, CurrentTSO, CurrentProc);
989         goto next_thread;
990       } 
991       /* Ignore ContinueThreads for threads that are being migrated */
992       if (PROCS(CurrentTSO)==Nowhere) { 
993         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
994               CurrentTSO->id, CurrentTSO, CurrentProc);
995         goto next_thread;
996       }
997       /* The thread should be at the beginning of the run queue */
998       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
999         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1000               CurrentTSO->id, CurrentTSO, CurrentProc);
1001         break; // run the thread anyway
1002       }
1003       /*
1004       new_event(proc, proc, CurrentTime[proc],
1005                 FindWork,
1006                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1007       goto next_thread; 
1008       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1009       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1010
1011     case FetchNode:
1012       do_the_fetchnode(event);
1013       goto next_thread;             /* handle next event in event queue  */
1014       
1015     case GlobalBlock:
1016       do_the_globalblock(event);
1017       goto next_thread;             /* handle next event in event queue  */
1018       
1019     case FetchReply:
1020       do_the_fetchreply(event);
1021       goto next_thread;             /* handle next event in event queue  */
1022       
1023     case UnblockThread:   /* Move from the blocked queue to the tail of */
1024       do_the_unblock(event);
1025       goto next_thread;             /* handle next event in event queue  */
1026       
1027     case ResumeThread:  /* Move from the blocked queue to the tail of */
1028       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1029       event->tso->gran.blocktime += 
1030         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1031       do_the_startthread(event);
1032       goto next_thread;             /* handle next event in event queue  */
1033       
1034     case StartThread:
1035       do_the_startthread(event);
1036       goto next_thread;             /* handle next event in event queue  */
1037       
1038     case MoveThread:
1039       do_the_movethread(event);
1040       goto next_thread;             /* handle next event in event queue  */
1041       
1042     case MoveSpark:
1043       do_the_movespark(event);
1044       goto next_thread;             /* handle next event in event queue  */
1045       
1046     case FindWork:
1047       do_the_findwork(event);
1048       goto next_thread;             /* handle next event in event queue  */
1049       
1050     default:
1051       barf("Illegal event type %u\n", event->evttype);
1052     }  /* switch */
1053     
1054     /* This point was scheduler_loop in the old RTS */
1055
1056     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1057
1058     TimeOfLastEvent = CurrentTime[CurrentProc];
1059     TimeOfNextEvent = get_time_of_next_event();
1060     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1061     // CurrentTSO = ThreadQueueHd;
1062
1063     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1064                          TimeOfNextEvent));
1065
1066     if (RtsFlags.GranFlags.Light) 
1067       GranSimLight_leave_system(event, &ActiveTSO); 
1068
1069     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1070
1071     IF_DEBUG(gran, 
1072              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1073
1074     /* in a GranSim setup the TSO stays on the run queue */
1075     t = CurrentTSO;
1076     /* Take a thread from the run queue. */
1077     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1078
1079     IF_DEBUG(gran, 
1080              debugBelch("GRAN: About to run current thread, which is\n");
1081              G_TSO(t,5));
1082
1083     context_switch = 0; // turned on via GranYield, checking events and time slice
1084
1085     IF_DEBUG(gran, 
1086              DumpGranEvent(GR_SCHEDULE, t));
1087
1088     procStatus[CurrentProc] = Busy;
1089 }
1090 #endif // GRAN
1091
1092 /* ----------------------------------------------------------------------------
1093  * Send pending messages (PARALLEL_HASKELL only)
1094  * ------------------------------------------------------------------------- */
1095
1096 #if defined(PARALLEL_HASKELL)
1097 static StgTSO *
1098 scheduleSendPendingMessages(void)
1099 {
1100     StgSparkPool *pool;
1101     rtsSpark spark;
1102     StgTSO *t;
1103
1104 # if defined(PAR) // global Mem.Mgmt., omit for now
1105     if (PendingFetches != END_BF_QUEUE) {
1106         processFetches();
1107     }
1108 # endif
1109     
1110     if (RtsFlags.ParFlags.BufferTime) {
1111         // if we use message buffering, we must send away all message
1112         // packets which have become too old...
1113         sendOldBuffers(); 
1114     }
1115 }
1116 #endif
1117
1118 /* ----------------------------------------------------------------------------
1119  * Activate spark threads (PARALLEL_HASKELL only)
1120  * ------------------------------------------------------------------------- */
1121
1122 #if defined(PARALLEL_HASKELL)
1123 static void
1124 scheduleActivateSpark(void)
1125 {
1126 #if defined(SPARKS)
1127   ASSERT(emptyRunQueue());
1128 /* We get here if the run queue is empty and want some work.
1129    We try to turn a spark into a thread, and add it to the run queue,
1130    from where it will be picked up in the next iteration of the scheduler
1131    loop.
1132 */
1133
1134       /* :-[  no local threads => look out for local sparks */
1135       /* the spark pool for the current PE */
1136       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1137       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1138           pool->hd < pool->tl) {
1139         /* 
1140          * ToDo: add GC code check that we really have enough heap afterwards!!
1141          * Old comment:
1142          * If we're here (no runnable threads) and we have pending
1143          * sparks, we must have a space problem.  Get enough space
1144          * to turn one of those pending sparks into a
1145          * thread... 
1146          */
1147
1148         spark = findSpark(rtsFalse);            /* get a spark */
1149         if (spark != (rtsSpark) NULL) {
1150           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1151           IF_PAR_DEBUG(fish, // schedule,
1152                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1153                              tso->id, tso, advisory_thread_count));
1154
1155           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1156             IF_PAR_DEBUG(fish, // schedule,
1157                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1158                             spark));
1159             return rtsFalse; /* failed to generate a thread */
1160           }                  /* otherwise fall through & pick-up new tso */
1161         } else {
1162           IF_PAR_DEBUG(fish, // schedule,
1163                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1164                              spark_queue_len(pool)));
1165           return rtsFalse;  /* failed to generate a thread */
1166         }
1167         return rtsTrue;  /* success in generating a thread */
1168   } else { /* no more threads permitted or pool empty */
1169     return rtsFalse;  /* failed to generateThread */
1170   }
1171 #else
1172   tso = NULL; // avoid compiler warning only
1173   return rtsFalse;  /* dummy in non-PAR setup */
1174 #endif // SPARKS
1175 }
1176 #endif // PARALLEL_HASKELL
1177
1178 /* ----------------------------------------------------------------------------
1179  * Get work from a remote node (PARALLEL_HASKELL only)
1180  * ------------------------------------------------------------------------- */
1181     
1182 #if defined(PARALLEL_HASKELL)
1183 static rtsBool
1184 scheduleGetRemoteWork(rtsBool *receivedFinish)
1185 {
1186   ASSERT(emptyRunQueue());
1187
1188   if (RtsFlags.ParFlags.BufferTime) {
1189         IF_PAR_DEBUG(verbose, 
1190                 debugBelch("...send all pending data,"));
1191         {
1192           nat i;
1193           for (i=1; i<=nPEs; i++)
1194             sendImmediately(i); // send all messages away immediately
1195         }
1196   }
1197 # ifndef SPARKS
1198         //++EDEN++ idle() , i.e. send all buffers, wait for work
1199         // suppress fishing in EDEN... just look for incoming messages
1200         // (blocking receive)
1201   IF_PAR_DEBUG(verbose, 
1202                debugBelch("...wait for incoming messages...\n"));
1203   *receivedFinish = processMessages(); // blocking receive...
1204
1205         // and reenter scheduling loop after having received something
1206         // (return rtsFalse below)
1207
1208 # else /* activate SPARKS machinery */
1209 /* We get here, if we have no work, tried to activate a local spark, but still
1210    have no work. We try to get a remote spark, by sending a FISH message.
1211    Thread migration should be added here, and triggered when a sequence of 
1212    fishes returns without work. */
1213         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1214
1215       /* =8-[  no local sparks => look for work on other PEs */
1216         /*
1217          * We really have absolutely no work.  Send out a fish
1218          * (there may be some out there already), and wait for
1219          * something to arrive.  We clearly can't run any threads
1220          * until a SCHEDULE or RESUME arrives, and so that's what
1221          * we're hoping to see.  (Of course, we still have to
1222          * respond to other types of messages.)
1223          */
1224         rtsTime now = msTime() /*CURRENT_TIME*/;
1225         IF_PAR_DEBUG(verbose, 
1226                      debugBelch("--  now=%ld\n", now));
1227         IF_PAR_DEBUG(fish, // verbose,
1228              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1229                  (last_fish_arrived_at!=0 &&
1230                   last_fish_arrived_at+delay > now)) {
1231                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1232                      now, last_fish_arrived_at+delay, 
1233                      last_fish_arrived_at,
1234                      delay);
1235              });
1236   
1237         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1238             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1239           if (last_fish_arrived_at==0 ||
1240               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1241             /* outstandingFishes is set in sendFish, processFish;
1242                avoid flooding system with fishes via delay */
1243     next_fish_to_send_at = 0;  
1244   } else {
1245     /* ToDo: this should be done in the main scheduling loop to avoid the
1246              busy wait here; not so bad if fish delay is very small  */
1247     int iq = 0; // DEBUGGING -- HWL
1248     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1249     /* send a fish when ready, but process messages that arrive in the meantime */
1250     do {
1251       if (PacketsWaiting()) {
1252         iq++; // DEBUGGING
1253         *receivedFinish = processMessages();
1254       }
1255       now = msTime();
1256     } while (!*receivedFinish || now<next_fish_to_send_at);
1257     // JB: This means the fish could become obsolete, if we receive
1258     // work. Better check for work again? 
1259     // last line: while (!receivedFinish || !haveWork || now<...)
1260     // next line: if (receivedFinish || haveWork )
1261
1262     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1263       return rtsFalse;  // NB: this will leave scheduler loop
1264                         // immediately after return!
1265                           
1266     IF_PAR_DEBUG(fish, // verbose,
1267                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1268
1269   }
1270
1271     // JB: IMHO, this should all be hidden inside sendFish(...)
1272     /* pe = choosePE(); 
1273        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1274                 NEW_FISH_HUNGER);
1275
1276     // Global statistics: count no. of fishes
1277     if (RtsFlags.ParFlags.ParStats.Global &&
1278          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1279            globalParStats.tot_fish_mess++;
1280            }
1281     */ 
1282
1283   /* delayed fishes must have been sent by now! */
1284   next_fish_to_send_at = 0;  
1285   }
1286       
1287   *receivedFinish = processMessages();
1288 # endif /* SPARKS */
1289
1290  return rtsFalse;
1291  /* NB: this function always returns rtsFalse, meaning the scheduler
1292     loop continues with the next iteration; 
1293     rationale: 
1294       return code means success in finding work; we enter this function
1295       if there is no local work, thus have to send a fish which takes
1296       time until it arrives with work; in the meantime we should process
1297       messages in the main loop;
1298  */
1299 }
1300 #endif // PARALLEL_HASKELL
1301
1302 /* ----------------------------------------------------------------------------
1303  * PAR/GRAN: Report stats & debugging info(?)
1304  * ------------------------------------------------------------------------- */
1305
1306 #if defined(PAR) || defined(GRAN)
1307 static void
1308 scheduleGranParReport(void)
1309 {
1310   ASSERT(run_queue_hd != END_TSO_QUEUE);
1311
1312   /* Take a thread from the run queue, if we have work */
1313   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1314
1315     /* If this TSO has got its outport closed in the meantime, 
1316      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1317      * It has to be marked as TH_DEAD for this purpose.
1318      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1319
1320 JB: TODO: investigate wether state change field could be nuked
1321      entirely and replaced by the normal tso state (whatnext
1322      field). All we want to do is to kill tsos from outside.
1323      */
1324
1325     /* ToDo: write something to the log-file
1326     if (RTSflags.ParFlags.granSimStats && !sameThread)
1327         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1328
1329     CurrentTSO = t;
1330     */
1331     /* the spark pool for the current PE */
1332     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1333
1334     IF_DEBUG(scheduler, 
1335              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1336                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1337
1338     IF_PAR_DEBUG(fish,
1339              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1340                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1341
1342     if (RtsFlags.ParFlags.ParStats.Full && 
1343         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1344         (emitSchedule || // forced emit
1345          (t && LastTSO && t->id != LastTSO->id))) {
1346       /* 
1347          we are running a different TSO, so write a schedule event to log file
1348          NB: If we use fair scheduling we also have to write  a deschedule 
1349              event for LastTSO; with unfair scheduling we know that the
1350              previous tso has blocked whenever we switch to another tso, so
1351              we don't need it in GUM for now
1352       */
1353       IF_PAR_DEBUG(fish, // schedule,
1354                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1355
1356       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1357                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1358       emitSchedule = rtsFalse;
1359     }
1360 }     
1361 #endif
1362
1363 /* ----------------------------------------------------------------------------
1364  * After running a thread...
1365  * ------------------------------------------------------------------------- */
1366
1367 static void
1368 schedulePostRunThread(void)
1369 {
1370 #if defined(PAR)
1371     /* HACK 675: if the last thread didn't yield, make sure to print a 
1372        SCHEDULE event to the log file when StgRunning the next thread, even
1373        if it is the same one as before */
1374     LastTSO = t; 
1375     TimeOfLastYield = CURRENT_TIME;
1376 #endif
1377
1378   /* some statistics gathering in the parallel case */
1379
1380 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1381   switch (ret) {
1382     case HeapOverflow:
1383 # if defined(GRAN)
1384       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1385       globalGranStats.tot_heapover++;
1386 # elif defined(PAR)
1387       globalParStats.tot_heapover++;
1388 # endif
1389       break;
1390
1391      case StackOverflow:
1392 # if defined(GRAN)
1393       IF_DEBUG(gran, 
1394                DumpGranEvent(GR_DESCHEDULE, t));
1395       globalGranStats.tot_stackover++;
1396 # elif defined(PAR)
1397       // IF_DEBUG(par, 
1398       // DumpGranEvent(GR_DESCHEDULE, t);
1399       globalParStats.tot_stackover++;
1400 # endif
1401       break;
1402
1403     case ThreadYielding:
1404 # if defined(GRAN)
1405       IF_DEBUG(gran, 
1406                DumpGranEvent(GR_DESCHEDULE, t));
1407       globalGranStats.tot_yields++;
1408 # elif defined(PAR)
1409       // IF_DEBUG(par, 
1410       // DumpGranEvent(GR_DESCHEDULE, t);
1411       globalParStats.tot_yields++;
1412 # endif
1413       break; 
1414
1415     case ThreadBlocked:
1416 # if defined(GRAN)
1417       IF_DEBUG(scheduler,
1418                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1419                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1420                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1421                if (t->block_info.closure!=(StgClosure*)NULL)
1422                  print_bq(t->block_info.closure);
1423                debugBelch("\n"));
1424
1425       // ??? needed; should emit block before
1426       IF_DEBUG(gran, 
1427                DumpGranEvent(GR_DESCHEDULE, t)); 
1428       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1429       /*
1430         ngoq Dogh!
1431       ASSERT(procStatus[CurrentProc]==Busy || 
1432               ((procStatus[CurrentProc]==Fetching) && 
1433               (t->block_info.closure!=(StgClosure*)NULL)));
1434       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1435           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1436             procStatus[CurrentProc]==Fetching)) 
1437         procStatus[CurrentProc] = Idle;
1438       */
1439 # elif defined(PAR)
1440 //++PAR++  blockThread() writes the event (change?)
1441 # endif
1442     break;
1443
1444   case ThreadFinished:
1445     break;
1446
1447   default:
1448     barf("parGlobalStats: unknown return code");
1449     break;
1450     }
1451 #endif
1452 }
1453
1454 /* -----------------------------------------------------------------------------
1455  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1456  * -------------------------------------------------------------------------- */
1457
1458 static rtsBool
1459 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1460 {
1461     // did the task ask for a large block?
1462     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1463         // if so, get one and push it on the front of the nursery.
1464         bdescr *bd;
1465         lnat blocks;
1466         
1467         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1468         
1469         IF_DEBUG(scheduler,
1470                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1471                             (long)t->id, whatNext_strs[t->what_next], blocks));
1472         
1473         // don't do this if the nursery is (nearly) full, we'll GC first.
1474         if (cap->r.rCurrentNursery->link != NULL ||
1475             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1476                                                // if the nursery has only one block.
1477             
1478             ACQUIRE_SM_LOCK
1479             bd = allocGroup( blocks );
1480             RELEASE_SM_LOCK
1481             cap->r.rNursery->n_blocks += blocks;
1482             
1483             // link the new group into the list
1484             bd->link = cap->r.rCurrentNursery;
1485             bd->u.back = cap->r.rCurrentNursery->u.back;
1486             if (cap->r.rCurrentNursery->u.back != NULL) {
1487                 cap->r.rCurrentNursery->u.back->link = bd;
1488             } else {
1489 #if !defined(SMP)
1490                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1491                        g0s0 == cap->r.rNursery);
1492 #endif
1493                 cap->r.rNursery->blocks = bd;
1494             }             
1495             cap->r.rCurrentNursery->u.back = bd;
1496             
1497             // initialise it as a nursery block.  We initialise the
1498             // step, gen_no, and flags field of *every* sub-block in
1499             // this large block, because this is easier than making
1500             // sure that we always find the block head of a large
1501             // block whenever we call Bdescr() (eg. evacuate() and
1502             // isAlive() in the GC would both have to do this, at
1503             // least).
1504             { 
1505                 bdescr *x;
1506                 for (x = bd; x < bd + blocks; x++) {
1507                     x->step = cap->r.rNursery;
1508                     x->gen_no = 0;
1509                     x->flags = 0;
1510                 }
1511             }
1512             
1513             // This assert can be a killer if the app is doing lots
1514             // of large block allocations.
1515             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1516             
1517             // now update the nursery to point to the new block
1518             cap->r.rCurrentNursery = bd;
1519             
1520             // we might be unlucky and have another thread get on the
1521             // run queue before us and steal the large block, but in that
1522             // case the thread will just end up requesting another large
1523             // block.
1524             pushOnRunQueue(cap,t);
1525             return rtsFalse;  /* not actually GC'ing */
1526         }
1527     }
1528     
1529     IF_DEBUG(scheduler,
1530              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1531                         (long)t->id, whatNext_strs[t->what_next]));
1532 #if defined(GRAN)
1533     ASSERT(!is_on_queue(t,CurrentProc));
1534 #elif defined(PARALLEL_HASKELL)
1535     /* Currently we emit a DESCHEDULE event before GC in GUM.
1536        ToDo: either add separate event to distinguish SYSTEM time from rest
1537        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1538     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1539         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1540                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1541         emitSchedule = rtsTrue;
1542     }
1543 #endif
1544       
1545     pushOnRunQueue(cap,t);
1546     return rtsTrue;
1547     /* actual GC is done at the end of the while loop in schedule() */
1548 }
1549
1550 /* -----------------------------------------------------------------------------
1551  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1552  * -------------------------------------------------------------------------- */
1553
1554 static void
1555 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1556 {
1557     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1558                                   (long)t->id, whatNext_strs[t->what_next]));
1559     /* just adjust the stack for this thread, then pop it back
1560      * on the run queue.
1561      */
1562     { 
1563         /* enlarge the stack */
1564         StgTSO *new_t = threadStackOverflow(cap, t);
1565         
1566         /* The TSO attached to this Task may have moved, so update the
1567          * pointer to it.
1568          */
1569         if (task->tso == t) {
1570             task->tso = new_t;
1571         }
1572         pushOnRunQueue(cap,new_t);
1573     }
1574 }
1575
1576 /* -----------------------------------------------------------------------------
1577  * Handle a thread that returned to the scheduler with ThreadYielding
1578  * -------------------------------------------------------------------------- */
1579
1580 static rtsBool
1581 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1582 {
1583     // Reset the context switch flag.  We don't do this just before
1584     // running the thread, because that would mean we would lose ticks
1585     // during GC, which can lead to unfair scheduling (a thread hogs
1586     // the CPU because the tick always arrives during GC).  This way
1587     // penalises threads that do a lot of allocation, but that seems
1588     // better than the alternative.
1589     context_switch = 0;
1590     
1591     /* put the thread back on the run queue.  Then, if we're ready to
1592      * GC, check whether this is the last task to stop.  If so, wake
1593      * up the GC thread.  getThread will block during a GC until the
1594      * GC is finished.
1595      */
1596     IF_DEBUG(scheduler,
1597              if (t->what_next != prev_what_next) {
1598                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1599                             (long)t->id, whatNext_strs[t->what_next]);
1600              } else {
1601                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1602                             (long)t->id, whatNext_strs[t->what_next]);
1603              }
1604         );
1605     
1606     IF_DEBUG(sanity,
1607              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1608              checkTSO(t));
1609     ASSERT(t->link == END_TSO_QUEUE);
1610     
1611     // Shortcut if we're just switching evaluators: don't bother
1612     // doing stack squeezing (which can be expensive), just run the
1613     // thread.
1614     if (t->what_next != prev_what_next) {
1615         return rtsTrue;
1616     }
1617     
1618 #if defined(GRAN)
1619     ASSERT(!is_on_queue(t,CurrentProc));
1620       
1621     IF_DEBUG(sanity,
1622              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1623              checkThreadQsSanity(rtsTrue));
1624
1625 #endif
1626
1627     addToRunQueue(cap,t);
1628
1629 #if defined(GRAN)
1630     /* add a ContinueThread event to actually process the thread */
1631     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1632               ContinueThread,
1633               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1634     IF_GRAN_DEBUG(bq, 
1635                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1636                   G_EVENTQ(0);
1637                   G_CURR_THREADQ(0));
1638 #endif
1639     return rtsFalse;
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643  * Handle a thread that returned to the scheduler with ThreadBlocked
1644  * -------------------------------------------------------------------------- */
1645
1646 static void
1647 scheduleHandleThreadBlocked( StgTSO *t
1648 #if !defined(GRAN) && !defined(DEBUG)
1649     STG_UNUSED
1650 #endif
1651     )
1652 {
1653 #if defined(GRAN)
1654     IF_DEBUG(scheduler,
1655              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1656                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1657              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1658     
1659     // ??? needed; should emit block before
1660     IF_DEBUG(gran, 
1661              DumpGranEvent(GR_DESCHEDULE, t)); 
1662     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1663     /*
1664       ngoq Dogh!
1665       ASSERT(procStatus[CurrentProc]==Busy || 
1666       ((procStatus[CurrentProc]==Fetching) && 
1667       (t->block_info.closure!=(StgClosure*)NULL)));
1668       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1669       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1670       procStatus[CurrentProc]==Fetching)) 
1671       procStatus[CurrentProc] = Idle;
1672     */
1673 #elif defined(PAR)
1674     IF_DEBUG(scheduler,
1675              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1676                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1677     IF_PAR_DEBUG(bq,
1678                  
1679                  if (t->block_info.closure!=(StgClosure*)NULL) 
1680                  print_bq(t->block_info.closure));
1681     
1682     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1683     blockThread(t);
1684     
1685     /* whatever we schedule next, we must log that schedule */
1686     emitSchedule = rtsTrue;
1687     
1688 #else /* !GRAN */
1689
1690       // We don't need to do anything.  The thread is blocked, and it
1691       // has tidied up its stack and placed itself on whatever queue
1692       // it needs to be on.
1693
1694 #if !defined(SMP)
1695     ASSERT(t->why_blocked != NotBlocked);
1696              // This might not be true under SMP: we don't have
1697              // exclusive access to this TSO, so someone might have
1698              // woken it up by now.  This actually happens: try
1699              // conc023 +RTS -N2.
1700 #endif
1701
1702     IF_DEBUG(scheduler,
1703              debugBelch("--<< thread %d (%s) stopped: ", 
1704                         t->id, whatNext_strs[t->what_next]);
1705              printThreadBlockage(t);
1706              debugBelch("\n"));
1707     
1708     /* Only for dumping event to log file 
1709        ToDo: do I need this in GranSim, too?
1710        blockThread(t);
1711     */
1712 #endif
1713 }
1714
1715 /* -----------------------------------------------------------------------------
1716  * Handle a thread that returned to the scheduler with ThreadFinished
1717  * -------------------------------------------------------------------------- */
1718
1719 static rtsBool
1720 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1721 {
1722     /* Need to check whether this was a main thread, and if so,
1723      * return with the return value.
1724      *
1725      * We also end up here if the thread kills itself with an
1726      * uncaught exception, see Exception.cmm.
1727      */
1728     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1729                                   t->id, whatNext_strs[t->what_next]));
1730
1731 #if defined(GRAN)
1732       endThread(t, CurrentProc); // clean-up the thread
1733 #elif defined(PARALLEL_HASKELL)
1734       /* For now all are advisory -- HWL */
1735       //if(t->priority==AdvisoryPriority) ??
1736       advisory_thread_count--; // JB: Caution with this counter, buggy!
1737       
1738 # if defined(DIST)
1739       if(t->dist.priority==RevalPriority)
1740         FinishReval(t);
1741 # endif
1742     
1743 # if defined(EDENOLD)
1744       // the thread could still have an outport... (BUG)
1745       if (t->eden.outport != -1) {
1746       // delete the outport for the tso which has finished...
1747         IF_PAR_DEBUG(eden_ports,
1748                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1749                               t->eden.outport, t->id));
1750         deleteOPT(t);
1751       }
1752       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1753       if (t->eden.epid != -1) {
1754         IF_PAR_DEBUG(eden_ports,
1755                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1756                            t->id, t->eden.epid));
1757         removeTSOfromProcess(t);
1758       }
1759 # endif 
1760
1761 # if defined(PAR)
1762       if (RtsFlags.ParFlags.ParStats.Full &&
1763           !RtsFlags.ParFlags.ParStats.Suppressed) 
1764         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1765
1766       //  t->par only contains statistics: left out for now...
1767       IF_PAR_DEBUG(fish,
1768                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1769                               t->id,t,t->par.sparkname));
1770 # endif
1771 #endif // PARALLEL_HASKELL
1772
1773       //
1774       // Check whether the thread that just completed was a bound
1775       // thread, and if so return with the result.  
1776       //
1777       // There is an assumption here that all thread completion goes
1778       // through this point; we need to make sure that if a thread
1779       // ends up in the ThreadKilled state, that it stays on the run
1780       // queue so it can be dealt with here.
1781       //
1782
1783       if (t->bound) {
1784
1785           if (t->bound != task) {
1786 #if !defined(THREADED_RTS)
1787               // Must be a bound thread that is not the topmost one.  Leave
1788               // it on the run queue until the stack has unwound to the
1789               // point where we can deal with this.  Leaving it on the run
1790               // queue also ensures that the garbage collector knows about
1791               // this thread and its return value (it gets dropped from the
1792               // all_threads list so there's no other way to find it).
1793               appendToRunQueue(cap,t);
1794               return rtsFalse;
1795 #else
1796               // this cannot happen in the threaded RTS, because a
1797               // bound thread can only be run by the appropriate Task.
1798               barf("finished bound thread that isn't mine");
1799 #endif
1800           }
1801
1802           ASSERT(task->tso == t);
1803
1804           if (t->what_next == ThreadComplete) {
1805               if (task->ret) {
1806                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1807                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1808               }
1809               task->stat = Success;
1810           } else {
1811               if (task->ret) {
1812                   *(task->ret) = NULL;
1813               }
1814               if (interrupted) {
1815                   task->stat = Interrupted;
1816               } else {
1817                   task->stat = Killed;
1818               }
1819           }
1820 #ifdef DEBUG
1821           removeThreadLabel((StgWord)task->tso->id);
1822 #endif
1823           return rtsTrue; // tells schedule() to return
1824       }
1825
1826       return rtsFalse;
1827 }
1828
1829 /* -----------------------------------------------------------------------------
1830  * Perform a heap census, if PROFILING
1831  * -------------------------------------------------------------------------- */
1832
1833 static rtsBool
1834 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1835 {
1836 #if defined(PROFILING)
1837     // When we have +RTS -i0 and we're heap profiling, do a census at
1838     // every GC.  This lets us get repeatable runs for debugging.
1839     if (performHeapProfile ||
1840         (RtsFlags.ProfFlags.profileInterval==0 &&
1841          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1842         GarbageCollect(GetRoots, rtsTrue);
1843         heapCensus();
1844         performHeapProfile = rtsFalse;
1845         return rtsTrue;  // true <=> we already GC'd
1846     }
1847 #endif
1848     return rtsFalse;
1849 }
1850
1851 /* -----------------------------------------------------------------------------
1852  * Perform a garbage collection if necessary
1853  * -------------------------------------------------------------------------- */
1854
1855 static void
1856 scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
1857 {
1858     StgTSO *t;
1859 #ifdef SMP
1860     static volatile StgWord waiting_for_gc;
1861     rtsBool was_waiting;
1862     nat i;
1863 #endif
1864
1865 #ifdef SMP
1866     // In order to GC, there must be no threads running Haskell code.
1867     // Therefore, the GC thread needs to hold *all* the capabilities,
1868     // and release them after the GC has completed.  
1869     //
1870     // This seems to be the simplest way: previous attempts involved
1871     // making all the threads with capabilities give up their
1872     // capabilities and sleep except for the *last* one, which
1873     // actually did the GC.  But it's quite hard to arrange for all
1874     // the other tasks to sleep and stay asleep.
1875     //
1876         
1877     was_waiting = cas(&waiting_for_gc, 0, 1);
1878     if (was_waiting) {
1879         do {
1880             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1881             yieldCapability(&cap,task);
1882         } while (waiting_for_gc);
1883         return;
1884     }
1885
1886     for (i=0; i < n_capabilities; i++) {
1887         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1888         if (cap != &capabilities[i]) {
1889             Capability *pcap = &capabilities[i];
1890             // we better hope this task doesn't get migrated to
1891             // another Capability while we're waiting for this one.
1892             // It won't, because load balancing happens while we have
1893             // all the Capabilities, but even so it's a slightly
1894             // unsavoury invariant.
1895             task->cap = pcap;
1896             context_switch = 1;
1897             waitForReturnCapability(&pcap, task);
1898             if (pcap != &capabilities[i]) {
1899                 barf("scheduleDoGC: got the wrong capability");
1900             }
1901         }
1902     }
1903
1904     waiting_for_gc = rtsFalse;
1905 #endif
1906
1907     /* Kick any transactions which are invalid back to their
1908      * atomically frames.  When next scheduled they will try to
1909      * commit, this commit will fail and they will retry.
1910      */
1911     { 
1912         StgTSO *next;
1913
1914         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1915             if (t->what_next == ThreadRelocated) {
1916                 next = t->link;
1917             } else {
1918                 next = t->global_link;
1919                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1920                     if (!stmValidateNestOfTransactions (t -> trec)) {
1921                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1922                         
1923                         // strip the stack back to the
1924                         // ATOMICALLY_FRAME, aborting the (nested)
1925                         // transaction, and saving the stack of any
1926                         // partially-evaluated thunks on the heap.
1927                         raiseAsync_(cap, t, NULL, rtsTrue);
1928                         
1929 #ifdef REG_R1
1930                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1931 #endif
1932                     }
1933                 }
1934             }
1935         }
1936     }
1937     
1938     // so this happens periodically:
1939     scheduleCheckBlackHoles(cap);
1940     
1941     IF_DEBUG(scheduler, printAllThreads());
1942
1943     /* everybody back, start the GC.
1944      * Could do it in this thread, or signal a condition var
1945      * to do it in another thread.  Either way, we need to
1946      * broadcast on gc_pending_cond afterward.
1947      */
1948 #if defined(THREADED_RTS)
1949     IF_DEBUG(scheduler,sched_belch("doing GC"));
1950 #endif
1951     GarbageCollect(GetRoots, force_major);
1952     
1953 #if defined(SMP)
1954     // release our stash of capabilities.
1955     for (i = 0; i < n_capabilities; i++) {
1956         if (cap != &capabilities[i]) {
1957             task->cap = &capabilities[i];
1958             releaseCapability(&capabilities[i]);
1959         }
1960     }
1961     task->cap = cap;
1962 #endif
1963
1964 #if defined(GRAN)
1965     /* add a ContinueThread event to continue execution of current thread */
1966     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1967               ContinueThread,
1968               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1969     IF_GRAN_DEBUG(bq, 
1970                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1971                   G_EVENTQ(0);
1972                   G_CURR_THREADQ(0));
1973 #endif /* GRAN */
1974 }
1975
1976 /* ---------------------------------------------------------------------------
1977  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1978  * used by Control.Concurrent for error checking.
1979  * ------------------------------------------------------------------------- */
1980  
1981 StgBool
1982 rtsSupportsBoundThreads(void)
1983 {
1984 #if defined(THREADED_RTS)
1985   return rtsTrue;
1986 #else
1987   return rtsFalse;
1988 #endif
1989 }
1990
1991 /* ---------------------------------------------------------------------------
1992  * isThreadBound(tso): check whether tso is bound to an OS thread.
1993  * ------------------------------------------------------------------------- */
1994  
1995 StgBool
1996 isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS)
1997 {
1998 #if defined(THREADED_RTS)
1999   return (tso->bound != NULL);
2000 #endif
2001   return rtsFalse;
2002 }
2003
2004 /* ---------------------------------------------------------------------------
2005  * Singleton fork(). Do not copy any running threads.
2006  * ------------------------------------------------------------------------- */
2007
2008 #if !defined(mingw32_HOST_OS) && !defined(SMP)
2009 #define FORKPROCESS_PRIMOP_SUPPORTED
2010 #endif
2011
2012 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2013 static void 
2014 deleteThreadImmediately(Capability *cap, StgTSO *tso);
2015 #endif
2016 StgInt
2017 forkProcess(HsStablePtr *entry
2018 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2019             STG_UNUSED
2020 #endif
2021            )
2022 {
2023 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2024     Task *task;
2025     pid_t pid;
2026     StgTSO* t,*next;
2027     Capability *cap;
2028     
2029     IF_DEBUG(scheduler,sched_belch("forking!"));
2030     
2031     // ToDo: for SMP, we should probably acquire *all* the capabilities
2032     cap = rts_lock();
2033     
2034     pid = fork();
2035     
2036     if (pid) { // parent
2037         
2038         // just return the pid
2039         rts_unlock(cap);
2040         return pid;
2041         
2042     } else { // child
2043         
2044         // delete all threads
2045         cap->run_queue_hd = END_TSO_QUEUE;
2046         cap->run_queue_tl = END_TSO_QUEUE;
2047         
2048         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2049             next = t->link;
2050             
2051             // don't allow threads to catch the ThreadKilled exception
2052             deleteThreadImmediately(cap,t);
2053         }
2054         
2055         // wipe the task list
2056         ACQUIRE_LOCK(&sched_mutex);
2057         for (task = all_tasks; task != NULL; task=task->all_link) {
2058             if (task != cap->running_task) discardTask(task);
2059         }
2060         RELEASE_LOCK(&sched_mutex);
2061
2062         // wipe our spare workers list.
2063         cap->spare_workers = NULL;
2064
2065         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2066         rts_checkSchedStatus("forkProcess",cap);
2067         
2068         rts_unlock(cap);
2069         hs_exit();                      // clean up and exit
2070         stg_exit(EXIT_SUCCESS);
2071     }
2072 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2073     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2074     return -1;
2075 #endif
2076 }
2077
2078 /* ---------------------------------------------------------------------------
2079  * Delete the threads on the run queue of the current capability.
2080  * ------------------------------------------------------------------------- */
2081    
2082 static void
2083 deleteRunQueue (Capability *cap)
2084 {
2085     StgTSO *t, *next;
2086     for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) {
2087         ASSERT(t->what_next != ThreadRelocated);
2088         next = t->link;
2089         deleteThread(cap, t);
2090     }
2091 }
2092
2093 /* startThread and  insertThread are now in GranSim.c -- HWL */
2094
2095
2096 /* -----------------------------------------------------------------------------
2097    Managing the suspended_ccalling_tasks list.
2098    Locks required: sched_mutex
2099    -------------------------------------------------------------------------- */
2100
2101 STATIC_INLINE void
2102 suspendTask (Capability *cap, Task *task)
2103 {
2104     ASSERT(task->next == NULL && task->prev == NULL);
2105     task->next = cap->suspended_ccalling_tasks;
2106     task->prev = NULL;
2107     if (cap->suspended_ccalling_tasks) {
2108         cap->suspended_ccalling_tasks->prev = task;
2109     }
2110     cap->suspended_ccalling_tasks = task;
2111 }
2112
2113 STATIC_INLINE void
2114 recoverSuspendedTask (Capability *cap, Task *task)
2115 {
2116     if (task->prev) {
2117         task->prev->next = task->next;
2118     } else {
2119         ASSERT(cap->suspended_ccalling_tasks == task);
2120         cap->suspended_ccalling_tasks = task->next;
2121     }
2122     if (task->next) {
2123         task->next->prev = task->prev;
2124     }
2125     task->next = task->prev = NULL;
2126 }
2127
2128 /* ---------------------------------------------------------------------------
2129  * Suspending & resuming Haskell threads.
2130  * 
2131  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2132  * its capability before calling the C function.  This allows another
2133  * task to pick up the capability and carry on running Haskell
2134  * threads.  It also means that if the C call blocks, it won't lock
2135  * the whole system.
2136  *
2137  * The Haskell thread making the C call is put to sleep for the
2138  * duration of the call, on the susepended_ccalling_threads queue.  We
2139  * give out a token to the task, which it can use to resume the thread
2140  * on return from the C function.
2141  * ------------------------------------------------------------------------- */
2142    
2143 void *
2144 suspendThread (StgRegTable *reg)
2145 {
2146   Capability *cap;
2147   int saved_errno = errno;
2148   StgTSO *tso;
2149   Task *task;
2150
2151   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2152    */
2153   cap = regTableToCapability(reg);
2154
2155   task = cap->running_task;
2156   tso = cap->r.rCurrentTSO;
2157
2158   IF_DEBUG(scheduler,
2159            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2160
2161   // XXX this might not be necessary --SDM
2162   tso->what_next = ThreadRunGHC;
2163
2164   threadPaused(tso);
2165
2166   if(tso->blocked_exceptions == NULL)  {
2167       tso->why_blocked = BlockedOnCCall;
2168       tso->blocked_exceptions = END_TSO_QUEUE;
2169   } else {
2170       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2171   }
2172
2173   // Hand back capability
2174   task->suspended_tso = tso;
2175
2176   ACQUIRE_LOCK(&cap->lock);
2177
2178   suspendTask(cap,task);
2179   cap->in_haskell = rtsFalse;
2180   releaseCapability_(cap);
2181   
2182   RELEASE_LOCK(&cap->lock);
2183
2184 #if defined(THREADED_RTS)
2185   /* Preparing to leave the RTS, so ensure there's a native thread/task
2186      waiting to take over.
2187   */
2188   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2189 #endif
2190
2191   errno = saved_errno;
2192   return task;
2193 }
2194
2195 StgRegTable *
2196 resumeThread (void *task_)
2197 {
2198     StgTSO *tso;
2199     Capability *cap;
2200     int saved_errno = errno;
2201     Task *task = task_;
2202
2203     cap = task->cap;
2204     // Wait for permission to re-enter the RTS with the result.
2205     waitForReturnCapability(&cap,task);
2206     // we might be on a different capability now... but if so, our
2207     // entry on the suspended_ccalling_tasks list will also have been
2208     // migrated.
2209
2210     // Remove the thread from the suspended list
2211     recoverSuspendedTask(cap,task);
2212
2213     tso = task->suspended_tso;
2214     task->suspended_tso = NULL;
2215     tso->link = END_TSO_QUEUE;
2216     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2217     
2218     if (tso->why_blocked == BlockedOnCCall) {
2219         awakenBlockedQueue(cap,tso->blocked_exceptions);
2220         tso->blocked_exceptions = NULL;
2221     }
2222     
2223     /* Reset blocking status */
2224     tso->why_blocked  = NotBlocked;
2225     
2226     cap->r.rCurrentTSO = tso;
2227     cap->in_haskell = rtsTrue;
2228     errno = saved_errno;
2229
2230     return &cap->r;
2231 }
2232
2233 /* ---------------------------------------------------------------------------
2234  * Comparing Thread ids.
2235  *
2236  * This is used from STG land in the implementation of the
2237  * instances of Eq/Ord for ThreadIds.
2238  * ------------------------------------------------------------------------ */
2239
2240 int
2241 cmp_thread(StgPtr tso1, StgPtr tso2) 
2242
2243   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2244   StgThreadID id2 = ((StgTSO *)tso2)->id;
2245  
2246   if (id1 < id2) return (-1);
2247   if (id1 > id2) return 1;
2248   return 0;
2249 }
2250
2251 /* ---------------------------------------------------------------------------
2252  * Fetching the ThreadID from an StgTSO.
2253  *
2254  * This is used in the implementation of Show for ThreadIds.
2255  * ------------------------------------------------------------------------ */
2256 int
2257 rts_getThreadId(StgPtr tso) 
2258 {
2259   return ((StgTSO *)tso)->id;
2260 }
2261
2262 #ifdef DEBUG
2263 void
2264 labelThread(StgPtr tso, char *label)
2265 {
2266   int len;
2267   void *buf;
2268
2269   /* Caveat: Once set, you can only set the thread name to "" */
2270   len = strlen(label)+1;
2271   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2272   strncpy(buf,label,len);
2273   /* Update will free the old memory for us */
2274   updateThreadLabel(((StgTSO *)tso)->id,buf);
2275 }
2276 #endif /* DEBUG */
2277
2278 /* ---------------------------------------------------------------------------
2279    Create a new thread.
2280
2281    The new thread starts with the given stack size.  Before the
2282    scheduler can run, however, this thread needs to have a closure
2283    (and possibly some arguments) pushed on its stack.  See
2284    pushClosure() in Schedule.h.
2285
2286    createGenThread() and createIOThread() (in SchedAPI.h) are
2287    convenient packaged versions of this function.
2288
2289    currently pri (priority) is only used in a GRAN setup -- HWL
2290    ------------------------------------------------------------------------ */
2291 #if defined(GRAN)
2292 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2293 StgTSO *
2294 createThread(nat size, StgInt pri)
2295 #else
2296 StgTSO *
2297 createThread(Capability *cap, nat size)
2298 #endif
2299 {
2300     StgTSO *tso;
2301     nat stack_size;
2302
2303     /* sched_mutex is *not* required */
2304
2305     /* First check whether we should create a thread at all */
2306 #if defined(PARALLEL_HASKELL)
2307     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2308     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2309         threadsIgnored++;
2310         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2311                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2312         return END_TSO_QUEUE;
2313     }
2314     threadsCreated++;
2315 #endif
2316
2317 #if defined(GRAN)
2318     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2319 #endif
2320
2321     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2322
2323     /* catch ridiculously small stack sizes */
2324     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2325         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2326     }
2327
2328     stack_size = size - TSO_STRUCT_SIZEW;
2329     
2330     tso = (StgTSO *)allocateLocal(cap, size);
2331     TICK_ALLOC_TSO(stack_size, 0);
2332
2333     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2334 #if defined(GRAN)
2335     SET_GRAN_HDR(tso, ThisPE);
2336 #endif
2337
2338     // Always start with the compiled code evaluator
2339     tso->what_next = ThreadRunGHC;
2340
2341     tso->why_blocked  = NotBlocked;
2342     tso->blocked_exceptions = NULL;
2343     
2344     tso->saved_errno = 0;
2345     tso->bound = NULL;
2346     
2347     tso->stack_size     = stack_size;
2348     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2349                           - TSO_STRUCT_SIZEW;
2350     tso->sp             = (P_)&(tso->stack) + stack_size;
2351
2352     tso->trec = NO_TREC;
2353     
2354 #ifdef PROFILING
2355     tso->prof.CCCS = CCS_MAIN;
2356 #endif
2357     
2358   /* put a stop frame on the stack */
2359     tso->sp -= sizeofW(StgStopFrame);
2360     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2361     tso->link = END_TSO_QUEUE;
2362     
2363   // ToDo: check this
2364 #if defined(GRAN)
2365     /* uses more flexible routine in GranSim */
2366     insertThread(tso, CurrentProc);
2367 #else
2368     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2369      * from its creation
2370      */
2371 #endif
2372     
2373 #if defined(GRAN) 
2374     if (RtsFlags.GranFlags.GranSimStats.Full) 
2375         DumpGranEvent(GR_START,tso);
2376 #elif defined(PARALLEL_HASKELL)
2377     if (RtsFlags.ParFlags.ParStats.Full) 
2378         DumpGranEvent(GR_STARTQ,tso);
2379     /* HACk to avoid SCHEDULE 
2380        LastTSO = tso; */
2381 #endif
2382     
2383     /* Link the new thread on the global thread list.
2384      */
2385     ACQUIRE_LOCK(&sched_mutex);
2386     tso->id = next_thread_id++;  // while we have the mutex
2387     tso->global_link = all_threads;
2388     all_threads = tso;
2389     RELEASE_LOCK(&sched_mutex);
2390     
2391 #if defined(DIST)
2392     tso->dist.priority = MandatoryPriority; //by default that is...
2393 #endif
2394     
2395 #if defined(GRAN)
2396     tso->gran.pri = pri;
2397 # if defined(DEBUG)
2398     tso->gran.magic = TSO_MAGIC; // debugging only
2399 # endif
2400     tso->gran.sparkname   = 0;
2401     tso->gran.startedat   = CURRENT_TIME; 
2402     tso->gran.exported    = 0;
2403     tso->gran.basicblocks = 0;
2404     tso->gran.allocs      = 0;
2405     tso->gran.exectime    = 0;
2406     tso->gran.fetchtime   = 0;
2407     tso->gran.fetchcount  = 0;
2408     tso->gran.blocktime   = 0;
2409     tso->gran.blockcount  = 0;
2410     tso->gran.blockedat   = 0;
2411     tso->gran.globalsparks = 0;
2412     tso->gran.localsparks  = 0;
2413     if (RtsFlags.GranFlags.Light)
2414         tso->gran.clock  = Now; /* local clock */
2415     else
2416         tso->gran.clock  = 0;
2417     
2418     IF_DEBUG(gran,printTSO(tso));
2419 #elif defined(PARALLEL_HASKELL)
2420 # if defined(DEBUG)
2421     tso->par.magic = TSO_MAGIC; // debugging only
2422 # endif
2423     tso->par.sparkname   = 0;
2424     tso->par.startedat   = CURRENT_TIME; 
2425     tso->par.exported    = 0;
2426     tso->par.basicblocks = 0;
2427     tso->par.allocs      = 0;
2428     tso->par.exectime    = 0;
2429     tso->par.fetchtime   = 0;
2430     tso->par.fetchcount  = 0;
2431     tso->par.blocktime   = 0;
2432     tso->par.blockcount  = 0;
2433     tso->par.blockedat   = 0;
2434     tso->par.globalsparks = 0;
2435     tso->par.localsparks  = 0;
2436 #endif
2437     
2438 #if defined(GRAN)
2439     globalGranStats.tot_threads_created++;
2440     globalGranStats.threads_created_on_PE[CurrentProc]++;
2441     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2442     globalGranStats.tot_sq_probes++;
2443 #elif defined(PARALLEL_HASKELL)
2444     // collect parallel global statistics (currently done together with GC stats)
2445     if (RtsFlags.ParFlags.ParStats.Global &&
2446         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2447         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2448         globalParStats.tot_threads_created++;
2449     }
2450 #endif 
2451     
2452 #if defined(GRAN)
2453     IF_GRAN_DEBUG(pri,
2454                   sched_belch("==__ schedule: Created TSO %d (%p);",
2455                               CurrentProc, tso, tso->id));
2456 #elif defined(PARALLEL_HASKELL)
2457     IF_PAR_DEBUG(verbose,
2458                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2459                              (long)tso->id, tso, advisory_thread_count));
2460 #else
2461     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2462                                    (long)tso->id, (long)tso->stack_size));
2463 #endif    
2464     return tso;
2465 }
2466
2467 #if defined(PAR)
2468 /* RFP:
2469    all parallel thread creation calls should fall through the following routine.
2470 */
2471 StgTSO *
2472 createThreadFromSpark(rtsSpark spark) 
2473 { StgTSO *tso;
2474   ASSERT(spark != (rtsSpark)NULL);
2475 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2476   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2477   { threadsIgnored++;
2478     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2479           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2480     return END_TSO_QUEUE;
2481   }
2482   else
2483   { threadsCreated++;
2484     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2485     if (tso==END_TSO_QUEUE)     
2486       barf("createSparkThread: Cannot create TSO");
2487 #if defined(DIST)
2488     tso->priority = AdvisoryPriority;
2489 #endif
2490     pushClosure(tso,spark);
2491     addToRunQueue(tso);
2492     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2493   }
2494   return tso;
2495 }
2496 #endif
2497
2498 /*
2499   Turn a spark into a thread.
2500   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2501 */
2502 #if 0
2503 StgTSO *
2504 activateSpark (rtsSpark spark) 
2505 {
2506   StgTSO *tso;
2507
2508   tso = createSparkThread(spark);
2509   if (RtsFlags.ParFlags.ParStats.Full) {   
2510     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2511       IF_PAR_DEBUG(verbose,
2512                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2513                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2514   }
2515   // ToDo: fwd info on local/global spark to thread -- HWL
2516   // tso->gran.exported =  spark->exported;
2517   // tso->gran.locked =   !spark->global;
2518   // tso->gran.sparkname = spark->name;
2519
2520   return tso;
2521 }
2522 #endif
2523
2524 /* ---------------------------------------------------------------------------
2525  * scheduleThread()
2526  *
2527  * scheduleThread puts a thread on the end  of the runnable queue.
2528  * This will usually be done immediately after a thread is created.
2529  * The caller of scheduleThread must create the thread using e.g.
2530  * createThread and push an appropriate closure
2531  * on this thread's stack before the scheduler is invoked.
2532  * ------------------------------------------------------------------------ */
2533
2534 void
2535 scheduleThread(Capability *cap, StgTSO *tso)
2536 {
2537     // The thread goes at the *end* of the run-queue, to avoid possible
2538     // starvation of any threads already on the queue.
2539     appendToRunQueue(cap,tso);
2540 }
2541
2542 Capability *
2543 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2544 {
2545     Task *task;
2546
2547     // We already created/initialised the Task
2548     task = cap->running_task;
2549
2550     // This TSO is now a bound thread; make the Task and TSO
2551     // point to each other.
2552     tso->bound = task;
2553
2554     task->tso = tso;
2555     task->ret = ret;
2556     task->stat = NoStatus;
2557
2558     appendToRunQueue(cap,tso);
2559
2560     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2561
2562 #if defined(GRAN)
2563     /* GranSim specific init */
2564     CurrentTSO = m->tso;                // the TSO to run
2565     procStatus[MainProc] = Busy;        // status of main PE
2566     CurrentProc = MainProc;             // PE to run it on
2567 #endif
2568
2569     cap = schedule(cap,task);
2570
2571     ASSERT(task->stat != NoStatus);
2572     ASSERT_CAPABILITY_INVARIANTS(cap,task);
2573
2574     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2575     return cap;
2576 }
2577
2578 /* ----------------------------------------------------------------------------
2579  * Starting Tasks
2580  * ------------------------------------------------------------------------- */
2581
2582 #if defined(THREADED_RTS)
2583 void
2584 workerStart(Task *task)
2585 {
2586     Capability *cap;
2587
2588     // See startWorkerTask().
2589     ACQUIRE_LOCK(&task->lock);
2590     cap = task->cap;
2591     RELEASE_LOCK(&task->lock);
2592
2593     // set the thread-local pointer to the Task:
2594     taskEnter(task);
2595
2596     // schedule() runs without a lock.
2597     cap = schedule(cap,task);
2598
2599     // On exit from schedule(), we have a Capability.
2600     releaseCapability(cap);
2601     taskStop(task);
2602 }
2603 #endif
2604
2605 /* ---------------------------------------------------------------------------
2606  * initScheduler()
2607  *
2608  * Initialise the scheduler.  This resets all the queues - if the
2609  * queues contained any threads, they'll be garbage collected at the
2610  * next pass.
2611  *
2612  * ------------------------------------------------------------------------ */
2613
2614 void 
2615 initScheduler(void)
2616 {
2617 #if defined(GRAN)
2618   nat i;
2619   for (i=0; i<=MAX_PROC; i++) {
2620     run_queue_hds[i]      = END_TSO_QUEUE;
2621     run_queue_tls[i]      = END_TSO_QUEUE;
2622     blocked_queue_hds[i]  = END_TSO_QUEUE;
2623     blocked_queue_tls[i]  = END_TSO_QUEUE;
2624     ccalling_threadss[i]  = END_TSO_QUEUE;
2625     blackhole_queue[i]    = END_TSO_QUEUE;
2626     sleeping_queue        = END_TSO_QUEUE;
2627   }
2628 #elif !defined(THREADED_RTS)
2629   blocked_queue_hd  = END_TSO_QUEUE;
2630   blocked_queue_tl  = END_TSO_QUEUE;
2631   sleeping_queue    = END_TSO_QUEUE;
2632 #endif
2633
2634   blackhole_queue   = END_TSO_QUEUE;
2635   all_threads       = END_TSO_QUEUE;
2636
2637   context_switch = 0;
2638   interrupted    = 0;
2639
2640   RtsFlags.ConcFlags.ctxtSwitchTicks =
2641       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2642       
2643 #if defined(THREADED_RTS)
2644   /* Initialise the mutex and condition variables used by
2645    * the scheduler. */
2646   initMutex(&sched_mutex);
2647 #endif
2648   
2649   ACQUIRE_LOCK(&sched_mutex);
2650
2651   /* A capability holds the state a native thread needs in
2652    * order to execute STG code. At least one capability is
2653    * floating around (only SMP builds have more than one).
2654    */
2655   initCapabilities();
2656
2657   initTaskManager();
2658
2659 #if defined(SMP)
2660   /*
2661    * Eagerly start one worker to run each Capability, except for
2662    * Capability 0.  The idea is that we're probably going to start a
2663    * bound thread on Capability 0 pretty soon, so we don't want a
2664    * worker task hogging it.
2665    */
2666   { 
2667       nat i;
2668       Capability *cap;
2669       for (i = 1; i < n_capabilities; i++) {
2670           cap = &capabilities[i];
2671           ACQUIRE_LOCK(&cap->lock);
2672           startWorkerTask(cap, workerStart);
2673           RELEASE_LOCK(&cap->lock);
2674       }
2675   }
2676 #endif
2677
2678 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2679   initSparkPools();
2680 #endif
2681
2682   RELEASE_LOCK(&sched_mutex);
2683 }
2684
2685 void
2686 exitScheduler( void )
2687 {
2688     interrupted = rtsTrue;
2689     shutting_down_scheduler = rtsTrue;
2690
2691 #if defined(THREADED_RTS)
2692     { 
2693         Task *task;
2694         nat i;
2695         
2696         ACQUIRE_LOCK(&sched_mutex);
2697         task = newBoundTask();
2698         RELEASE_LOCK(&sched_mutex);
2699
2700         for (i = 0; i < n_capabilities; i++) {
2701             shutdownCapability(&capabilities[i], task);
2702         }
2703         boundTaskExiting(task);
2704         stopTaskManager();
2705     }
2706 #endif
2707 }
2708
2709 /* ---------------------------------------------------------------------------
2710    Where are the roots that we know about?
2711
2712         - all the threads on the runnable queue
2713         - all the threads on the blocked queue
2714         - all the threads on the sleeping queue
2715         - all the thread currently executing a _ccall_GC
2716         - all the "main threads"
2717      
2718    ------------------------------------------------------------------------ */
2719
2720 /* This has to be protected either by the scheduler monitor, or by the
2721         garbage collection monitor (probably the latter).
2722         KH @ 25/10/99
2723 */
2724
2725 void
2726 GetRoots( evac_fn evac )
2727 {
2728     nat i;
2729     Capability *cap;
2730     Task *task;
2731
2732 #if defined(GRAN)
2733     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2734         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2735             evac((StgClosure **)&run_queue_hds[i]);
2736         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2737             evac((StgClosure **)&run_queue_tls[i]);
2738         
2739         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2740             evac((StgClosure **)&blocked_queue_hds[i]);
2741         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2742             evac((StgClosure **)&blocked_queue_tls[i]);
2743         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2744             evac((StgClosure **)&ccalling_threads[i]);
2745     }
2746
2747     markEventQueue();
2748
2749 #else /* !GRAN */
2750
2751     for (i = 0; i < n_capabilities; i++) {
2752         cap = &capabilities[i];
2753         evac((StgClosure **)&cap->run_queue_hd);
2754         evac((StgClosure **)&cap->run_queue_tl);
2755         
2756         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2757              task=task->next) {
2758             evac((StgClosure **)&task->suspended_tso);
2759         }
2760     }
2761     
2762 #if !defined(THREADED_RTS)
2763     evac((StgClosure **)&blocked_queue_hd);
2764     evac((StgClosure **)&blocked_queue_tl);
2765     evac((StgClosure **)&sleeping_queue);
2766 #endif 
2767 #endif
2768
2769     evac((StgClosure **)&blackhole_queue);
2770
2771 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2772     markSparkQueue(evac);
2773 #endif
2774     
2775 #if defined(RTS_USER_SIGNALS)
2776     // mark the signal handlers (signals should be already blocked)
2777     markSignalHandlers(evac);
2778 #endif
2779 }
2780
2781 /* -----------------------------------------------------------------------------
2782    performGC
2783
2784    This is the interface to the garbage collector from Haskell land.
2785    We provide this so that external C code can allocate and garbage
2786    collect when called from Haskell via _ccall_GC.
2787
2788    It might be useful to provide an interface whereby the programmer
2789    can specify more roots (ToDo).
2790    
2791    This needs to be protected by the GC condition variable above.  KH.
2792    -------------------------------------------------------------------------- */
2793
2794 static void (*extra_roots)(evac_fn);
2795
2796 void
2797 performGC(void)
2798 {
2799 #ifdef THREADED_RTS
2800     // ToDo: we have to grab all the capabilities here.
2801     errorBelch("performGC not supported in threaded RTS (yet)");
2802     stg_exit(EXIT_FAILURE);
2803 #endif
2804     /* Obligated to hold this lock upon entry */
2805     GarbageCollect(GetRoots,rtsFalse);
2806 }
2807
2808 void
2809 performMajorGC(void)
2810 {
2811 #ifdef THREADED_RTS
2812     errorBelch("performMayjorGC not supported in threaded RTS (yet)");
2813     stg_exit(EXIT_FAILURE);
2814 #endif
2815     GarbageCollect(GetRoots,rtsTrue);
2816 }
2817
2818 static void
2819 AllRoots(evac_fn evac)
2820 {
2821     GetRoots(evac);             // the scheduler's roots
2822     extra_roots(evac);          // the user's roots
2823 }
2824
2825 void
2826 performGCWithRoots(void (*get_roots)(evac_fn))
2827 {
2828 #ifdef THREADED_RTS
2829     errorBelch("performGCWithRoots not supported in threaded RTS (yet)");
2830     stg_exit(EXIT_FAILURE);
2831 #endif
2832     extra_roots = get_roots;
2833     GarbageCollect(AllRoots,rtsFalse);
2834 }
2835
2836 /* -----------------------------------------------------------------------------
2837    Stack overflow
2838
2839    If the thread has reached its maximum stack size, then raise the
2840    StackOverflow exception in the offending thread.  Otherwise
2841    relocate the TSO into a larger chunk of memory and adjust its stack
2842    size appropriately.
2843    -------------------------------------------------------------------------- */
2844
2845 static StgTSO *
2846 threadStackOverflow(Capability *cap, StgTSO *tso)
2847 {
2848   nat new_stack_size, stack_words;
2849   lnat new_tso_size;
2850   StgPtr new_sp;
2851   StgTSO *dest;
2852
2853   IF_DEBUG(sanity,checkTSO(tso));
2854   if (tso->stack_size >= tso->max_stack_size) {
2855
2856     IF_DEBUG(gc,
2857              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2858                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2859              /* If we're debugging, just print out the top of the stack */
2860              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2861                                               tso->sp+64)));
2862
2863     /* Send this thread the StackOverflow exception */
2864     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
2865     return tso;
2866   }
2867
2868   /* Try to double the current stack size.  If that takes us over the
2869    * maximum stack size for this thread, then use the maximum instead.
2870    * Finally round up so the TSO ends up as a whole number of blocks.
2871    */
2872   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2873   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2874                                        TSO_STRUCT_SIZE)/sizeof(W_);
2875   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2876   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2877
2878   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
2879
2880   dest = (StgTSO *)allocate(new_tso_size);
2881   TICK_ALLOC_TSO(new_stack_size,0);
2882
2883   /* copy the TSO block and the old stack into the new area */
2884   memcpy(dest,tso,TSO_STRUCT_SIZE);
2885   stack_words = tso->stack + tso->stack_size - tso->sp;
2886   new_sp = (P_)dest + new_tso_size - stack_words;
2887   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2888
2889   /* relocate the stack pointers... */
2890   dest->sp         = new_sp;
2891   dest->stack_size = new_stack_size;
2892         
2893   /* Mark the old TSO as relocated.  We have to check for relocated
2894    * TSOs in the garbage collector and any primops that deal with TSOs.
2895    *
2896    * It's important to set the sp value to just beyond the end
2897    * of the stack, so we don't attempt to scavenge any part of the
2898    * dead TSO's stack.
2899    */
2900   tso->what_next = ThreadRelocated;
2901   tso->link = dest;
2902   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2903   tso->why_blocked = NotBlocked;
2904
2905   IF_PAR_DEBUG(verbose,
2906                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2907                      tso->id, tso, tso->stack_size);
2908                /* If we're debugging, just print out the top of the stack */
2909                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2910                                                 tso->sp+64)));
2911   
2912   IF_DEBUG(sanity,checkTSO(tso));
2913 #if 0
2914   IF_DEBUG(scheduler,printTSO(dest));
2915 #endif
2916
2917   return dest;
2918 }
2919
2920 /* ---------------------------------------------------------------------------
2921    Wake up a queue that was blocked on some resource.
2922    ------------------------------------------------------------------------ */
2923
2924 #if defined(GRAN)
2925 STATIC_INLINE void
2926 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2927 {
2928 }
2929 #elif defined(PARALLEL_HASKELL)
2930 STATIC_INLINE void
2931 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2932 {
2933   /* write RESUME events to log file and
2934      update blocked and fetch time (depending on type of the orig closure) */
2935   if (RtsFlags.ParFlags.ParStats.Full) {
2936     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2937                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2938                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2939     if (emptyRunQueue())
2940       emitSchedule = rtsTrue;
2941
2942     switch (get_itbl(node)->type) {
2943         case FETCH_ME_BQ:
2944           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2945           break;
2946         case RBH:
2947         case FETCH_ME:
2948         case BLACKHOLE_BQ:
2949           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2950           break;
2951 #ifdef DIST
2952         case MVAR:
2953           break;
2954 #endif    
2955         default:
2956           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
2957         }
2958       }
2959 }
2960 #endif
2961
2962 #if defined(GRAN)
2963 StgBlockingQueueElement *
2964 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2965 {
2966     StgTSO *tso;
2967     PEs node_loc, tso_loc;
2968
2969     node_loc = where_is(node); // should be lifted out of loop
2970     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2971     tso_loc = where_is((StgClosure *)tso);
2972     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2973       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2974       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2975       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2976       // insertThread(tso, node_loc);
2977       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2978                 ResumeThread,
2979                 tso, node, (rtsSpark*)NULL);
2980       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2981       // len_local++;
2982       // len++;
2983     } else { // TSO is remote (actually should be FMBQ)
2984       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2985                                   RtsFlags.GranFlags.Costs.gunblocktime +
2986                                   RtsFlags.GranFlags.Costs.latency;
2987       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2988                 UnblockThread,
2989                 tso, node, (rtsSpark*)NULL);
2990       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2991       // len++;
2992     }
2993     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2994     IF_GRAN_DEBUG(bq,
2995                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2996                           (node_loc==tso_loc ? "Local" : "Global"), 
2997                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2998     tso->block_info.closure = NULL;
2999     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3000                              tso->id, tso));
3001 }
3002 #elif defined(PARALLEL_HASKELL)
3003 StgBlockingQueueElement *
3004 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3005 {
3006     StgBlockingQueueElement *next;
3007
3008     switch (get_itbl(bqe)->type) {
3009     case TSO:
3010       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3011       /* if it's a TSO just push it onto the run_queue */
3012       next = bqe->link;
3013       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3014       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3015       threadRunnable();
3016       unblockCount(bqe, node);
3017       /* reset blocking status after dumping event */
3018       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3019       break;
3020
3021     case BLOCKED_FETCH:
3022       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3023       next = bqe->link;
3024       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3025       PendingFetches = (StgBlockedFetch *)bqe;
3026       break;
3027
3028 # if defined(DEBUG)
3029       /* can ignore this case in a non-debugging setup; 
3030          see comments on RBHSave closures above */
3031     case CONSTR:
3032       /* check that the closure is an RBHSave closure */
3033       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3034              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3035              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3036       break;
3037
3038     default:
3039       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3040            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3041            (StgClosure *)bqe);
3042 # endif
3043     }
3044   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3045   return next;
3046 }
3047 #endif
3048
3049 StgTSO *
3050 unblockOne(Capability *cap, StgTSO *tso)
3051 {
3052   StgTSO *next;
3053
3054   ASSERT(get_itbl(tso)->type == TSO);
3055   ASSERT(tso->why_blocked != NotBlocked);
3056   tso->why_blocked = NotBlocked;
3057   next = tso->link;
3058   tso->link = END_TSO_QUEUE;
3059
3060   // We might have just migrated this TSO to our Capability:
3061   if (tso->bound) {
3062       tso->bound->cap = cap;
3063   }
3064
3065   appendToRunQueue(cap,tso);
3066
3067   // we're holding a newly woken thread, make sure we context switch
3068   // quickly so we can migrate it if necessary.
3069   context_switch = 1;
3070   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3071   return next;
3072 }
3073
3074
3075 #if defined(GRAN)
3076 void 
3077 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3078 {
3079   StgBlockingQueueElement *bqe;
3080   PEs node_loc;
3081   nat len = 0; 
3082
3083   IF_GRAN_DEBUG(bq, 
3084                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3085                       node, CurrentProc, CurrentTime[CurrentProc], 
3086                       CurrentTSO->id, CurrentTSO));
3087
3088   node_loc = where_is(node);
3089
3090   ASSERT(q == END_BQ_QUEUE ||
3091          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3092          get_itbl(q)->type == CONSTR); // closure (type constructor)
3093   ASSERT(is_unique(node));
3094
3095   /* FAKE FETCH: magically copy the node to the tso's proc;
3096      no Fetch necessary because in reality the node should not have been 
3097      moved to the other PE in the first place
3098   */
3099   if (CurrentProc!=node_loc) {
3100     IF_GRAN_DEBUG(bq, 
3101                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3102                         node, node_loc, CurrentProc, CurrentTSO->id, 
3103                         // CurrentTSO, where_is(CurrentTSO),
3104                         node->header.gran.procs));
3105     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3106     IF_GRAN_DEBUG(bq, 
3107                   debugBelch("## new bitmask of node %p is %#x\n",
3108                         node, node->header.gran.procs));
3109     if (RtsFlags.GranFlags.GranSimStats.Global) {
3110       globalGranStats.tot_fake_fetches++;
3111     }
3112   }
3113
3114   bqe = q;
3115   // ToDo: check: ASSERT(CurrentProc==node_loc);
3116   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3117     //next = bqe->link;
3118     /* 
3119        bqe points to the current element in the queue
3120        next points to the next element in the queue
3121     */
3122     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3123     //tso_loc = where_is(tso);
3124     len++;
3125     bqe = unblockOne(bqe, node);
3126   }
3127
3128   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3129      the closure to make room for the anchor of the BQ */
3130   if (bqe!=END_BQ_QUEUE) {
3131     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3132     /*
3133     ASSERT((info_ptr==&RBH_Save_0_info) ||
3134            (info_ptr==&RBH_Save_1_info) ||
3135            (info_ptr==&RBH_Save_2_info));
3136     */
3137     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3138     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3139     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3140
3141     IF_GRAN_DEBUG(bq,
3142                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3143                         node, info_type(node)));
3144   }
3145
3146   /* statistics gathering */
3147   if (RtsFlags.GranFlags.GranSimStats.Global) {
3148     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3149     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3150     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3151     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3152   }
3153   IF_GRAN_DEBUG(bq,
3154                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3155                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3156 }
3157 #elif defined(PARALLEL_HASKELL)
3158 void 
3159 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3160 {
3161   StgBlockingQueueElement *bqe;
3162
3163   IF_PAR_DEBUG(verbose, 
3164                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3165                      node, mytid));
3166 #ifdef DIST  
3167   //RFP
3168   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3169     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3170     return;
3171   }
3172 #endif
3173   
3174   ASSERT(q == END_BQ_QUEUE ||
3175          get_itbl(q)->type == TSO ||           
3176          get_itbl(q)->type == BLOCKED_FETCH || 
3177          get_itbl(q)->type == CONSTR); 
3178
3179   bqe = q;
3180   while (get_itbl(bqe)->type==TSO || 
3181          get_itbl(bqe)->type==BLOCKED_FETCH) {
3182     bqe = unblockOne(bqe, node);
3183   }
3184 }
3185
3186 #else   /* !GRAN && !PARALLEL_HASKELL */
3187
3188 void
3189 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3190 {
3191     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3192                              // Exception.cmm
3193     while (tso != END_TSO_QUEUE) {
3194         tso = unblockOne(cap,tso);
3195     }
3196 }
3197 #endif
3198
3199 /* ---------------------------------------------------------------------------
3200    Interrupt execution
3201    - usually called inside a signal handler so it mustn't do anything fancy.   
3202    ------------------------------------------------------------------------ */
3203
3204 void
3205 interruptStgRts(void)
3206 {
3207     interrupted    = 1;
3208     context_switch = 1;
3209 #if defined(THREADED_RTS)
3210     prodAllCapabilities();
3211 #endif
3212 }
3213
3214 /* -----------------------------------------------------------------------------
3215    Unblock a thread
3216
3217    This is for use when we raise an exception in another thread, which
3218    may be blocked.
3219    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3220    -------------------------------------------------------------------------- */
3221
3222 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3223 /*
3224   NB: only the type of the blocking queue is different in GranSim and GUM
3225       the operations on the queue-elements are the same
3226       long live polymorphism!
3227
3228   Locks: sched_mutex is held upon entry and exit.
3229
3230 */
3231 static void
3232 unblockThread(Capability *cap, StgTSO *tso)
3233 {
3234   StgBlockingQueueElement *t, **last;
3235
3236   switch (tso->why_blocked) {
3237
3238   case NotBlocked:
3239     return;  /* not blocked */
3240
3241   case BlockedOnSTM:
3242     // Be careful: nothing to do here!  We tell the scheduler that the thread
3243     // is runnable and we leave it to the stack-walking code to abort the 
3244     // transaction while unwinding the stack.  We should perhaps have a debugging
3245     // test to make sure that this really happens and that the 'zombie' transaction
3246     // does not get committed.
3247     goto done;
3248
3249   case BlockedOnMVar:
3250     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3251     {
3252       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3253       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3254
3255       last = (StgBlockingQueueElement **)&mvar->head;
3256       for (t = (StgBlockingQueueElement *)mvar->head; 
3257            t != END_BQ_QUEUE; 
3258            last = &t->link, last_tso = t, t = t->link) {
3259         if (t == (StgBlockingQueueElement *)tso) {
3260           *last = (StgBlockingQueueElement *)tso->link;
3261           if (mvar->tail == tso) {
3262             mvar->tail = (StgTSO *)last_tso;
3263           }
3264           goto done;
3265         }
3266       }
3267       barf("unblockThread (MVAR): TSO not found");
3268     }
3269
3270   case BlockedOnBlackHole:
3271     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3272     {
3273       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3274
3275       last = &bq->blocking_queue;
3276       for (t = bq->blocking_queue; 
3277            t != END_BQ_QUEUE; 
3278            last = &t->link, t = t->link) {
3279         if (t == (StgBlockingQueueElement *)tso) {
3280           *last = (StgBlockingQueueElement *)tso->link;
3281           goto done;
3282         }
3283       }
3284       barf("unblockThread (BLACKHOLE): TSO not found");
3285     }
3286
3287   case BlockedOnException:
3288     {
3289       StgTSO *target  = tso->block_info.tso;
3290
3291       ASSERT(get_itbl(target)->type == TSO);
3292
3293       if (target->what_next == ThreadRelocated) {
3294           target = target->link;
3295           ASSERT(get_itbl(target)->type == TSO);
3296       }
3297
3298       ASSERT(target->blocked_exceptions != NULL);
3299
3300       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3301       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3302            t != END_BQ_QUEUE; 
3303            last = &t->link, t = t->link) {
3304         ASSERT(get_itbl(t)->type == TSO);
3305         if (t == (StgBlockingQueueElement *)tso) {
3306           *last = (StgBlockingQueueElement *)tso->link;
3307           goto done;
3308         }
3309       }
3310       barf("unblockThread (Exception): TSO not found");
3311     }
3312
3313   case BlockedOnRead:
3314   case BlockedOnWrite:
3315 #if defined(mingw32_HOST_OS)
3316   case BlockedOnDoProc:
3317 #endif
3318     {
3319       /* take TSO off blocked_queue */
3320       StgBlockingQueueElement *prev = NULL;
3321       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3322            prev = t, t = t->link) {
3323         if (t == (StgBlockingQueueElement *)tso) {
3324           if (prev == NULL) {
3325             blocked_queue_hd = (StgTSO *)t->link;
3326             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3327               blocked_queue_tl = END_TSO_QUEUE;
3328             }
3329           } else {
3330             prev->link = t->link;
3331             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3332               blocked_queue_tl = (StgTSO *)prev;
3333             }
3334           }
3335 #if defined(mingw32_HOST_OS)
3336           /* (Cooperatively) signal that the worker thread should abort
3337            * the request.
3338            */
3339           abandonWorkRequest(tso->block_info.async_result->reqID);
3340 #endif
3341           goto done;
3342         }
3343       }
3344       barf("unblockThread (I/O): TSO not found");
3345     }
3346
3347   case BlockedOnDelay:
3348     {
3349       /* take TSO off sleeping_queue */
3350       StgBlockingQueueElement *prev = NULL;
3351       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3352            prev = t, t = t->link) {
3353         if (t == (StgBlockingQueueElement *)tso) {
3354           if (prev == NULL) {
3355             sleeping_queue = (StgTSO *)t->link;
3356           } else {
3357             prev->link = t->link;
3358           }
3359           goto done;
3360         }
3361       }
3362       barf("unblockThread (delay): TSO not found");
3363     }
3364
3365   default:
3366     barf("unblockThread");
3367   }
3368
3369  done:
3370   tso->link = END_TSO_QUEUE;
3371   tso->why_blocked = NotBlocked;
3372   tso->block_info.closure = NULL;
3373   pushOnRunQueue(cap,tso);
3374 }
3375 #else
3376 static void
3377 unblockThread(Capability *cap, StgTSO *tso)
3378 {
3379   StgTSO *t, **last;
3380   
3381   /* To avoid locking unnecessarily. */
3382   if (tso->why_blocked == NotBlocked) {
3383     return;
3384   }
3385
3386   switch (tso->why_blocked) {
3387
3388   case BlockedOnSTM:
3389     // Be careful: nothing to do here!  We tell the scheduler that the thread
3390     // is runnable and we leave it to the stack-walking code to abort the 
3391     // transaction while unwinding the stack.  We should perhaps have a debugging
3392     // test to make sure that this really happens and that the 'zombie' transaction
3393     // does not get committed.
3394     goto done;
3395
3396   case BlockedOnMVar:
3397     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3398     {
3399       StgTSO *last_tso = END_TSO_QUEUE;
3400       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3401
3402       last = &mvar->head;
3403       for (t = mvar->head; t != END_TSO_QUEUE; 
3404            last = &t->link, last_tso = t, t = t->link) {
3405         if (t == tso) {
3406           *last = tso->link;
3407           if (mvar->tail == tso) {
3408             mvar->tail = last_tso;
3409           }
3410           goto done;
3411         }
3412       }
3413       barf("unblockThread (MVAR): TSO not found");
3414     }
3415
3416   case BlockedOnBlackHole:
3417     {
3418       last = &blackhole_queue;
3419       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3420            last = &t->link, t = t->link) {
3421         if (t == tso) {
3422           *last = tso->link;
3423           goto done;
3424         }
3425       }
3426       barf("unblockThread (BLACKHOLE): TSO not found");
3427     }
3428
3429   case BlockedOnException:
3430     {
3431       StgTSO *target  = tso->block_info.tso;
3432
3433       ASSERT(get_itbl(target)->type == TSO);
3434
3435       while (target->what_next == ThreadRelocated) {
3436           target = target->link;
3437           ASSERT(get_itbl(target)->type == TSO);
3438       }
3439       
3440       ASSERT(target->blocked_exceptions != NULL);
3441
3442       last = &target->blocked_exceptions;
3443       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3444            last = &t->link, t = t->link) {
3445         ASSERT(get_itbl(t)->type == TSO);
3446         if (t == tso) {
3447           *last = tso->link;
3448           goto done;
3449         }
3450       }
3451       barf("unblockThread (Exception): TSO not found");
3452     }
3453
3454 #if !defined(THREADED_RTS)
3455   case BlockedOnRead:
3456   case BlockedOnWrite:
3457 #if defined(mingw32_HOST_OS)
3458   case BlockedOnDoProc:
3459 #endif
3460     {
3461       StgTSO *prev = NULL;
3462       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3463            prev = t, t = t->link) {
3464         if (t == tso) {
3465           if (prev == NULL) {
3466             blocked_queue_hd = t->link;
3467             if (blocked_queue_tl == t) {
3468               blocked_queue_tl = END_TSO_QUEUE;
3469             }
3470           } else {
3471             prev->link = t->link;
3472             if (blocked_queue_tl == t) {
3473               blocked_queue_tl = prev;
3474             }
3475           }
3476 #if defined(mingw32_HOST_OS)
3477           /* (Cooperatively) signal that the worker thread should abort
3478            * the request.
3479            */
3480           abandonWorkRequest(tso->block_info.async_result->reqID);
3481 #endif
3482           goto done;
3483         }
3484       }
3485       barf("unblockThread (I/O): TSO not found");
3486     }
3487
3488   case BlockedOnDelay:
3489     {
3490       StgTSO *prev = NULL;
3491       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3492            prev = t, t = t->link) {
3493         if (t == tso) {
3494           if (prev == NULL) {
3495             sleeping_queue = t->link;
3496           } else {
3497             prev->link = t->link;
3498           }
3499           goto done;
3500         }
3501       }
3502       barf("unblockThread (delay): TSO not found");
3503     }
3504 #endif
3505
3506   default:
3507     barf("unblockThread");
3508   }
3509
3510  done:
3511   tso->link = END_TSO_QUEUE;
3512   tso->why_blocked = NotBlocked;
3513   tso->block_info.closure = NULL;
3514   appendToRunQueue(cap,tso);
3515 }
3516 #endif
3517
3518 /* -----------------------------------------------------------------------------
3519  * checkBlackHoles()
3520  *
3521  * Check the blackhole_queue for threads that can be woken up.  We do
3522  * this periodically: before every GC, and whenever the run queue is
3523  * empty.
3524  *
3525  * An elegant solution might be to just wake up all the blocked
3526  * threads with awakenBlockedQueue occasionally: they'll go back to
3527  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3528  * doesn't give us a way to tell whether we've actually managed to
3529  * wake up any threads, so we would be busy-waiting.
3530  *
3531  * -------------------------------------------------------------------------- */
3532
3533 static rtsBool
3534 checkBlackHoles (Capability *cap)
3535 {
3536     StgTSO **prev, *t;
3537     rtsBool any_woke_up = rtsFalse;
3538     StgHalfWord type;
3539
3540     // blackhole_queue is global:
3541     ASSERT_LOCK_HELD(&sched_mutex);
3542
3543     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3544
3545     // ASSUMES: sched_mutex
3546     prev = &blackhole_queue;
3547     t = blackhole_queue;
3548     while (t != END_TSO_QUEUE) {
3549         ASSERT(t->why_blocked == BlockedOnBlackHole);
3550         type = get_itbl(t->block_info.closure)->type;
3551         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3552             IF_DEBUG(sanity,checkTSO(t));
3553             t = unblockOne(cap, t);
3554             // urk, the threads migrate to the current capability
3555             // here, but we'd like to keep them on the original one.
3556             *prev = t;
3557             any_woke_up = rtsTrue;
3558         } else {
3559             prev = &t->link;
3560             t = t->link;
3561         }
3562     }
3563
3564     return any_woke_up;
3565 }
3566
3567 /* -----------------------------------------------------------------------------
3568  * raiseAsync()
3569  *
3570  * The following function implements the magic for raising an
3571  * asynchronous exception in an existing thread.
3572  *
3573  * We first remove the thread from any queue on which it might be
3574  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3575  *
3576  * We strip the stack down to the innermost CATCH_FRAME, building
3577  * thunks in the heap for all the active computations, so they can 
3578  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3579  * an application of the handler to the exception, and push it on
3580  * the top of the stack.
3581  * 
3582  * How exactly do we save all the active computations?  We create an
3583  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3584  * AP_STACKs pushes everything from the corresponding update frame
3585  * upwards onto the stack.  (Actually, it pushes everything up to the
3586  * next update frame plus a pointer to the next AP_STACK object.
3587  * Entering the next AP_STACK object pushes more onto the stack until we
3588  * reach the last AP_STACK object - at which point the stack should look
3589  * exactly as it did when we killed the TSO and we can continue
3590  * execution by entering the closure on top of the stack.
3591  *
3592  * We can also kill a thread entirely - this happens if either (a) the 
3593  * exception passed to raiseAsync is NULL, or (b) there's no
3594  * CATCH_FRAME on the stack.  In either case, we strip the entire
3595  * stack and replace the thread with a zombie.
3596  *
3597  * ToDo: in SMP mode, this function is only safe if either (a) we hold
3598  * all the Capabilities (eg. in GC), or (b) we own the Capability that
3599  * the TSO is currently blocked on or on the run queue of.
3600  *
3601  * -------------------------------------------------------------------------- */
3602  
3603 void
3604 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3605 {
3606     raiseAsync_(cap, tso, exception, rtsFalse);
3607 }
3608
3609 static void
3610 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3611             rtsBool stop_at_atomically)
3612 {
3613     StgRetInfoTable *info;
3614     StgPtr sp;
3615   
3616     // Thread already dead?
3617     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3618         return;
3619     }
3620
3621     IF_DEBUG(scheduler, 
3622              sched_belch("raising exception in thread %ld.", (long)tso->id));
3623     
3624     // Remove it from any blocking queues
3625     unblockThread(cap,tso);
3626
3627     sp = tso->sp;
3628     
3629     // The stack freezing code assumes there's a closure pointer on
3630     // the top of the stack, so we have to arrange that this is the case...
3631     //
3632     if (sp[0] == (W_)&stg_enter_info) {
3633         sp++;
3634     } else {
3635         sp--;
3636         sp[0] = (W_)&stg_dummy_ret_closure;
3637     }
3638
3639     while (1) {
3640         nat i;
3641
3642         // 1. Let the top of the stack be the "current closure"
3643         //
3644         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3645         // CATCH_FRAME.
3646         //
3647         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3648         // current closure applied to the chunk of stack up to (but not
3649         // including) the update frame.  This closure becomes the "current
3650         // closure".  Go back to step 2.
3651         //
3652         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3653         // top of the stack applied to the exception.
3654         // 
3655         // 5. If it's a STOP_FRAME, then kill the thread.
3656         // 
3657         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3658         // transaction
3659        
3660         
3661         StgPtr frame;
3662         
3663         frame = sp + 1;
3664         info = get_ret_itbl((StgClosure *)frame);
3665         
3666         while (info->i.type != UPDATE_FRAME
3667                && (info->i.type != CATCH_FRAME || exception == NULL)
3668                && info->i.type != STOP_FRAME
3669                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3670         {
3671             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3672               // IF we find an ATOMICALLY_FRAME then we abort the
3673               // current transaction and propagate the exception.  In
3674               // this case (unlike ordinary exceptions) we do not care
3675               // whether the transaction is valid or not because its
3676               // possible validity cannot have caused the exception
3677               // and will not be visible after the abort.
3678               IF_DEBUG(stm,
3679                        debugBelch("Found atomically block delivering async exception\n"));
3680               stmAbortTransaction(tso -> trec);
3681               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3682             }
3683             frame += stack_frame_sizeW((StgClosure *)frame);
3684             info = get_ret_itbl((StgClosure *)frame);
3685         }
3686         
3687         switch (info->i.type) {
3688             
3689         case ATOMICALLY_FRAME:
3690             ASSERT(stop_at_atomically);
3691             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3692             stmCondemnTransaction(tso -> trec);
3693 #ifdef REG_R1
3694             tso->sp = frame;
3695 #else
3696             // R1 is not a register: the return convention for IO in
3697             // this case puts the return value on the stack, so we
3698             // need to set up the stack to return to the atomically
3699             // frame properly...
3700             tso->sp = frame - 2;
3701             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3702             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3703 #endif
3704             tso->what_next = ThreadRunGHC;
3705             return;
3706
3707         case CATCH_FRAME:
3708             // If we find a CATCH_FRAME, and we've got an exception to raise,
3709             // then build the THUNK raise(exception), and leave it on
3710             // top of the CATCH_FRAME ready to enter.
3711             //
3712         {
3713 #ifdef PROFILING
3714             StgCatchFrame *cf = (StgCatchFrame *)frame;
3715 #endif
3716             StgThunk *raise;
3717             
3718             // we've got an exception to raise, so let's pass it to the
3719             // handler in this frame.
3720             //
3721             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3722             TICK_ALLOC_SE_THK(1,0);
3723             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3724             raise->payload[0] = exception;
3725             
3726             // throw away the stack from Sp up to the CATCH_FRAME.
3727             //
3728             sp = frame - 1;
3729             
3730             /* Ensure that async excpetions are blocked now, so we don't get
3731              * a surprise exception before we get around to executing the
3732              * handler.
3733              */
3734             if (tso->blocked_exceptions == NULL) {
3735                 tso->blocked_exceptions = END_TSO_QUEUE;
3736             }
3737             
3738             /* Put the newly-built THUNK on top of the stack, ready to execute
3739              * when the thread restarts.
3740              */
3741             sp[0] = (W_)raise;
3742             sp[-1] = (W_)&stg_enter_info;
3743             tso->sp = sp-1;
3744             tso->what_next = ThreadRunGHC;
3745             IF_DEBUG(sanity, checkTSO(tso));
3746             return;
3747         }
3748         
3749         case UPDATE_FRAME:
3750         {
3751             StgAP_STACK * ap;
3752             nat words;
3753             
3754             // First build an AP_STACK consisting of the stack chunk above the
3755             // current update frame, with the top word on the stack as the
3756             // fun field.
3757             //
3758             words = frame - sp - 1;
3759             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3760             
3761             ap->size = words;
3762             ap->fun  = (StgClosure *)sp[0];
3763             sp++;
3764             for(i=0; i < (nat)words; ++i) {
3765                 ap->payload[i] = (StgClosure *)*sp++;
3766             }
3767             
3768             SET_HDR(ap,&stg_AP_STACK_info,
3769                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3770             TICK_ALLOC_UP_THK(words+1,0);
3771             
3772             IF_DEBUG(scheduler,
3773                      debugBelch("sched: Updating ");
3774                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3775                      debugBelch(" with ");
3776                      printObj((StgClosure *)ap);
3777                 );
3778
3779             // Replace the updatee with an indirection - happily
3780             // this will also wake up any threads currently
3781             // waiting on the result.
3782             //
3783             // Warning: if we're in a loop, more than one update frame on
3784             // the stack may point to the same object.  Be careful not to
3785             // overwrite an IND_OLDGEN in this case, because we'll screw
3786             // up the mutable lists.  To be on the safe side, don't
3787             // overwrite any kind of indirection at all.  See also
3788             // threadSqueezeStack in GC.c, where we have to make a similar
3789             // check.
3790             //
3791             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3792                 // revert the black hole
3793                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3794                                (StgClosure *)ap);
3795             }
3796             sp += sizeofW(StgUpdateFrame) - 1;
3797             sp[0] = (W_)ap; // push onto stack
3798             break;
3799         }
3800         
3801         case STOP_FRAME:
3802             // We've stripped the entire stack, the thread is now dead.
3803             sp += sizeofW(StgStopFrame);
3804             tso->what_next = ThreadKilled;
3805             tso->sp = sp;
3806             return;
3807             
3808         default:
3809             barf("raiseAsync");
3810         }
3811     }
3812     barf("raiseAsync");
3813 }
3814
3815 /* -----------------------------------------------------------------------------
3816    Deleting threads
3817
3818    This is used for interruption (^C) and forking, and corresponds to
3819    raising an exception but without letting the thread catch the
3820    exception.
3821    -------------------------------------------------------------------------- */
3822
3823 static void 
3824 deleteThread (Capability *cap, StgTSO *tso)
3825 {
3826   if (tso->why_blocked != BlockedOnCCall &&
3827       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3828       raiseAsync(cap,tso,NULL);
3829   }
3830 }
3831
3832 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3833 static void 
3834 deleteThreadImmediately(Capability *cap, StgTSO *tso)
3835 { // for forkProcess only:
3836   // delete thread without giving it a chance to catch the KillThread exception
3837
3838   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3839       return;
3840   }
3841
3842   if (tso->why_blocked != BlockedOnCCall &&
3843       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3844       unblockThread(cap,tso);
3845   }
3846
3847   tso->what_next = ThreadKilled;
3848 }
3849 #endif
3850
3851 /* -----------------------------------------------------------------------------
3852    raiseExceptionHelper
3853    
3854    This function is called by the raise# primitve, just so that we can
3855    move some of the tricky bits of raising an exception from C-- into
3856    C.  Who knows, it might be a useful re-useable thing here too.
3857    -------------------------------------------------------------------------- */
3858
3859 StgWord
3860 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
3861 {
3862     Capability *cap = regTableToCapability(reg);
3863     StgThunk *raise_closure = NULL;
3864     StgPtr p, next;
3865     StgRetInfoTable *info;
3866     //
3867     // This closure represents the expression 'raise# E' where E
3868     // is the exception raise.  It is used to overwrite all the
3869     // thunks which are currently under evaluataion.
3870     //
3871
3872     //    
3873     // LDV profiling: stg_raise_info has THUNK as its closure
3874     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3875     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3876     // 1 does not cause any problem unless profiling is performed.
3877     // However, when LDV profiling goes on, we need to linearly scan
3878     // small object pool, where raise_closure is stored, so we should
3879     // use MIN_UPD_SIZE.
3880     //
3881     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3882     //                                 sizeofW(StgClosure)+1);
3883     //
3884
3885     //
3886     // Walk up the stack, looking for the catch frame.  On the way,
3887     // we update any closures pointed to from update frames with the
3888     // raise closure that we just built.
3889     //
3890     p = tso->sp;
3891     while(1) {
3892         info = get_ret_itbl((StgClosure *)p);
3893         next = p + stack_frame_sizeW((StgClosure *)p);
3894         switch (info->i.type) {
3895             
3896         case UPDATE_FRAME:
3897             // Only create raise_closure if we need to.
3898             if (raise_closure == NULL) {
3899                 raise_closure = 
3900                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
3901                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3902                 raise_closure->payload[0] = exception;
3903             }
3904             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
3905             p = next;
3906             continue;
3907
3908         case ATOMICALLY_FRAME:
3909             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3910             tso->sp = p;
3911             return ATOMICALLY_FRAME;
3912             
3913         case CATCH_FRAME:
3914             tso->sp = p;
3915             return CATCH_FRAME;
3916
3917         case CATCH_STM_FRAME:
3918             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3919             tso->sp = p;
3920             return CATCH_STM_FRAME;
3921             
3922         case STOP_FRAME:
3923             tso->sp = p;
3924             return STOP_FRAME;
3925
3926         case CATCH_RETRY_FRAME:
3927         default:
3928             p = next; 
3929             continue;
3930         }
3931     }
3932 }
3933
3934
3935 /* -----------------------------------------------------------------------------
3936    findRetryFrameHelper
3937
3938    This function is called by the retry# primitive.  It traverses the stack
3939    leaving tso->sp referring to the frame which should handle the retry.  
3940
3941    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3942    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3943
3944    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3945    despite the similar implementation.
3946
3947    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3948    not be created within memory transactions.
3949    -------------------------------------------------------------------------- */
3950
3951 StgWord
3952 findRetryFrameHelper (StgTSO *tso)
3953 {
3954   StgPtr           p, next;
3955   StgRetInfoTable *info;
3956
3957   p = tso -> sp;
3958   while (1) {
3959     info = get_ret_itbl((StgClosure *)p);
3960     next = p + stack_frame_sizeW((StgClosure *)p);
3961     switch (info->i.type) {
3962       
3963     case ATOMICALLY_FRAME:
3964       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3965       tso->sp = p;
3966       return ATOMICALLY_FRAME;
3967       
3968     case CATCH_RETRY_FRAME:
3969       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3970       tso->sp = p;
3971       return CATCH_RETRY_FRAME;
3972       
3973     case CATCH_STM_FRAME:
3974     default:
3975       ASSERT(info->i.type != CATCH_FRAME);
3976       ASSERT(info->i.type != STOP_FRAME);
3977       p = next; 
3978       continue;
3979     }
3980   }
3981 }
3982
3983 /* -----------------------------------------------------------------------------
3984    resurrectThreads is called after garbage collection on the list of
3985    threads found to be garbage.  Each of these threads will be woken
3986    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3987    on an MVar, or NonTermination if the thread was blocked on a Black
3988    Hole.
3989
3990    Locks: assumes we hold *all* the capabilities.
3991    -------------------------------------------------------------------------- */
3992
3993 void
3994 resurrectThreads (StgTSO *threads)
3995 {
3996     StgTSO *tso, *next;
3997     Capability *cap;
3998
3999     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4000         next = tso->global_link;
4001         tso->global_link = all_threads;
4002         all_threads = tso;
4003         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4004         
4005         // Wake up the thread on the Capability it was last on for a
4006         // bound thread, or last_free_capability otherwise.
4007         if (tso->bound) {
4008             cap = tso->bound->cap;
4009         } else {
4010             cap = last_free_capability;
4011         }
4012         
4013         switch (tso->why_blocked) {
4014         case BlockedOnMVar:
4015         case BlockedOnException:
4016             /* Called by GC - sched_mutex lock is currently held. */
4017             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4018             break;
4019         case BlockedOnBlackHole:
4020             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4021             break;
4022         case BlockedOnSTM:
4023             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4024             break;
4025         case NotBlocked:
4026             /* This might happen if the thread was blocked on a black hole
4027              * belonging to a thread that we've just woken up (raiseAsync
4028              * can wake up threads, remember...).
4029              */
4030             continue;
4031         default:
4032             barf("resurrectThreads: thread blocked in a strange way");
4033         }
4034     }
4035 }
4036
4037 /* ----------------------------------------------------------------------------
4038  * Debugging: why is a thread blocked
4039  * [Also provides useful information when debugging threaded programs
4040  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4041    ------------------------------------------------------------------------- */
4042
4043 #if DEBUG
4044 static void
4045 printThreadBlockage(StgTSO *tso)
4046 {
4047   switch (tso->why_blocked) {
4048   case BlockedOnRead:
4049     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4050     break;
4051   case BlockedOnWrite:
4052     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4053     break;
4054 #if defined(mingw32_HOST_OS)
4055     case BlockedOnDoProc:
4056     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4057     break;
4058 #endif
4059   case BlockedOnDelay:
4060     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4061     break;
4062   case BlockedOnMVar:
4063     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4064     break;
4065   case BlockedOnException:
4066     debugBelch("is blocked on delivering an exception to thread %d",
4067             tso->block_info.tso->id);
4068     break;
4069   case BlockedOnBlackHole:
4070     debugBelch("is blocked on a black hole");
4071     break;
4072   case NotBlocked:
4073     debugBelch("is not blocked");
4074     break;
4075 #if defined(PARALLEL_HASKELL)
4076   case BlockedOnGA:
4077     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4078             tso->block_info.closure, info_type(tso->block_info.closure));
4079     break;
4080   case BlockedOnGA_NoSend:
4081     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4082             tso->block_info.closure, info_type(tso->block_info.closure));
4083     break;
4084 #endif
4085   case BlockedOnCCall:
4086     debugBelch("is blocked on an external call");
4087     break;
4088   case BlockedOnCCall_NoUnblockExc:
4089     debugBelch("is blocked on an external call (exceptions were already blocked)");
4090     break;
4091   case BlockedOnSTM:
4092     debugBelch("is blocked on an STM operation");
4093     break;
4094   default:
4095     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4096          tso->why_blocked, tso->id, tso);
4097   }
4098 }
4099
4100 void
4101 printThreadStatus(StgTSO *t)
4102 {
4103     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4104     {
4105       void *label = lookupThreadLabel(t->id);
4106       if (label) debugBelch("[\"%s\"] ",(char *)label);
4107     }
4108     if (t->what_next == ThreadRelocated) {
4109         debugBelch("has been relocated...\n");
4110     } else {
4111         switch (t->what_next) {
4112         case ThreadKilled:
4113             debugBelch("has been killed");
4114             break;
4115         case ThreadComplete:
4116             debugBelch("has completed");
4117             break;
4118         default:
4119             printThreadBlockage(t);
4120         }
4121         debugBelch("\n");
4122     }
4123 }
4124
4125 void
4126 printAllThreads(void)
4127 {
4128   StgTSO *t, *next;
4129   nat i;
4130   Capability *cap;
4131
4132 # if defined(GRAN)
4133   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4134   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4135                        time_string, rtsFalse/*no commas!*/);
4136
4137   debugBelch("all threads at [%s]:\n", time_string);
4138 # elif defined(PARALLEL_HASKELL)
4139   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4140   ullong_format_string(CURRENT_TIME,
4141                        time_string, rtsFalse/*no commas!*/);
4142
4143   debugBelch("all threads at [%s]:\n", time_string);
4144 # else
4145   debugBelch("all threads:\n");
4146 # endif
4147
4148   for (i = 0; i < n_capabilities; i++) {
4149       cap = &capabilities[i];
4150       debugBelch("threads on capability %d:\n", cap->no);
4151       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4152           printThreadStatus(t);
4153       }
4154   }
4155
4156   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4157       if (t->why_blocked != NotBlocked) {
4158           printThreadStatus(t);
4159       }
4160       if (t->what_next == ThreadRelocated) {
4161           next = t->link;
4162       } else {
4163           next = t->global_link;
4164       }
4165   }
4166 }
4167
4168 // useful from gdb
4169 void 
4170 printThreadQueue(StgTSO *t)
4171 {
4172     nat i = 0;
4173     for (; t != END_TSO_QUEUE; t = t->link) {
4174         printThreadStatus(t);
4175         i++;
4176     }
4177     debugBelch("%d threads on queue\n", i);
4178 }
4179
4180 /* 
4181    Print a whole blocking queue attached to node (debugging only).
4182 */
4183 # if defined(PARALLEL_HASKELL)
4184 void 
4185 print_bq (StgClosure *node)
4186 {
4187   StgBlockingQueueElement *bqe;
4188   StgTSO *tso;
4189   rtsBool end;
4190
4191   debugBelch("## BQ of closure %p (%s): ",
4192           node, info_type(node));
4193
4194   /* should cover all closures that may have a blocking queue */
4195   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4196          get_itbl(node)->type == FETCH_ME_BQ ||
4197          get_itbl(node)->type == RBH ||
4198          get_itbl(node)->type == MVAR);
4199     
4200   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4201
4202   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4203 }
4204
4205 /* 
4206    Print a whole blocking queue starting with the element bqe.
4207 */
4208 void 
4209 print_bqe (StgBlockingQueueElement *bqe)
4210 {
4211   rtsBool end;
4212
4213   /* 
4214      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4215   */
4216   for (end = (bqe==END_BQ_QUEUE);
4217        !end; // iterate until bqe points to a CONSTR
4218        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4219        bqe = end ? END_BQ_QUEUE : bqe->link) {
4220     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4221     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4222     /* types of closures that may appear in a blocking queue */
4223     ASSERT(get_itbl(bqe)->type == TSO ||           
4224            get_itbl(bqe)->type == BLOCKED_FETCH || 
4225            get_itbl(bqe)->type == CONSTR); 
4226     /* only BQs of an RBH end with an RBH_Save closure */
4227     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4228
4229     switch (get_itbl(bqe)->type) {
4230     case TSO:
4231       debugBelch(" TSO %u (%x),",
4232               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4233       break;
4234     case BLOCKED_FETCH:
4235       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4236               ((StgBlockedFetch *)bqe)->node, 
4237               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4238               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4239               ((StgBlockedFetch *)bqe)->ga.weight);
4240       break;
4241     case CONSTR:
4242       debugBelch(" %s (IP %p),",
4243               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4244                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4245                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4246                "RBH_Save_?"), get_itbl(bqe));
4247       break;
4248     default:
4249       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4250            info_type((StgClosure *)bqe)); // , node, info_type(node));
4251       break;
4252     }
4253   } /* for */
4254   debugBelch("\n");
4255 }
4256 # elif defined(GRAN)
4257 void 
4258 print_bq (StgClosure *node)
4259 {
4260   StgBlockingQueueElement *bqe;
4261   PEs node_loc, tso_loc;
4262   rtsBool end;
4263
4264   /* should cover all closures that may have a blocking queue */
4265   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4266          get_itbl(node)->type == FETCH_ME_BQ ||
4267          get_itbl(node)->type == RBH);
4268     
4269   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4270   node_loc = where_is(node);
4271
4272   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4273           node, info_type(node), node_loc);
4274
4275   /* 
4276      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4277   */
4278   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4279        !end; // iterate until bqe points to a CONSTR
4280        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4281     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4282     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4283     /* types of closures that may appear in a blocking queue */
4284     ASSERT(get_itbl(bqe)->type == TSO ||           
4285            get_itbl(bqe)->type == CONSTR); 
4286     /* only BQs of an RBH end with an RBH_Save closure */
4287     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4288
4289     tso_loc = where_is((StgClosure *)bqe);
4290     switch (get_itbl(bqe)->type) {
4291     case TSO:
4292       debugBelch(" TSO %d (%p) on [PE %d],",
4293               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4294       break;
4295     case CONSTR:
4296       debugBelch(" %s (IP %p),",
4297               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4298                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4299                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4300                "RBH_Save_?"), get_itbl(bqe));
4301       break;
4302     default:
4303       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4304            info_type((StgClosure *)bqe), node, info_type(node));
4305       break;
4306     }
4307   } /* for */
4308   debugBelch("\n");
4309 }
4310 # endif
4311
4312 #if defined(PARALLEL_HASKELL)
4313 static nat
4314 run_queue_len(void)
4315 {
4316     nat i;
4317     StgTSO *tso;
4318     
4319     for (i=0, tso=run_queue_hd; 
4320          tso != END_TSO_QUEUE;
4321          i++, tso=tso->link) {
4322         /* nothing */
4323     }
4324         
4325     return i;
4326 }
4327 #endif
4328
4329 void
4330 sched_belch(char *s, ...)
4331 {
4332     va_list ap;
4333     va_start(ap,s);
4334 #ifdef THREADED_RTS
4335     debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());
4336 #elif defined(PARALLEL_HASKELL)
4337     debugBelch("== ");
4338 #else
4339     debugBelch("sched: ");
4340 #endif
4341     vdebugBelch(s, ap);
4342     debugBelch("\n");
4343     va_end(ap);
4344 }
4345
4346 #endif /* DEBUG */